Projects >> tomcat60 >>028f5846fa4768db152345e47334989f5198e4ea

Chunk
Conflicting content
import org.apache.coyote.Adapter;
     */
    protected static StringManager sm =
     */
<<<<<<< HEAD
/*
 *  Copyright 1999-2004 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.coyote.http11;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.coyote.Request;
import org.apache.coyote.RequestInfo;
import org.apache.coyote.Response;
import org.apache.coyote.http11.filters.BufferedInputFilter;
import org.apache.coyote.http11.filters.ChunkedInputFilter;
import org.apache.coyote.http11.filters.ChunkedOutputFilter;
import org.apache.coyote.http11.filters.GzipOutputFilter;
import org.apache.coyote.http11.filters.IdentityInputFilter;
import org.apache.coyote.http11.filters.IdentityOutputFilter;
import org.apache.coyote.http11.filters.SavedRequestInputFilter;
import org.apache.coyote.http11.filters.VoidInputFilter;
import org.apache.coyote.http11.filters.VoidOutputFilter;
import org.apache.tomcat.util.buf.Ascii;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioEndpoint.Handler;
import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
import java.nio.channels.SelectionKey;


/**
 * Processes HTTP requests.
 *
 * @author Remy Maucherat
 * @author Filip Hanik
 */
public class Http11NioProcessor implements ActionHook {


    /**
     * Logger.
     */
    protected static org.apache.commons.logging.Log log
        = org.apache.commons.logging.LogFactory.getLog(Http11NioProcessor.class);

    /**
     * The string manager for this package.
        StringManager.getManager(Constants.Package);


    // ----------------------------------------------------------- Constructors


    public Http11NioProcessor(int headerBufferSize, NioEndpoint endpoint) {

        this.endpoint = endpoint;

        request = new Request();
        int readTimeout = endpoint.getFirstReadTimeout();
        if (readTimeout == 0) {
            readTimeout = 100;
        } else if (readTimeout < 0) {
            readTimeout = timeout;
            //readTimeout = -1;
        }
        inputBuffer = new InternalNioInputBuffer(request, headerBufferSize,readTimeout);
        inputBuffer.setPoller(endpoint.getPoller());
        request.setInputBuffer(inputBuffer);

        response = new Response();
        response.setHook(this);
        outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
        response.setOutputBuffer(outputBuffer);
        request.setResponse(response);

        ssl = !"off".equalsIgnoreCase(endpoint.getSSLEngine());

        initializeFilters();

        // Cause loading of HexUtils
        int foo = HexUtils.DEC[0];

        // Cause loading of FastHttpDateFormat
        FastHttpDateFormat.getCurrentDate();

    }


    // ----------------------------------------------------- Instance Variables


    /**
     * Associated adapter.
     */
    protected Adapter adapter = null;


    /**
     * Request object.
     */
    protected Request request = null;


    /**
     * Response object.
     */
    protected Response response = null;


    /**
     * List of restricted user agents.

     * Input.
     */
    protected InternalNioInputBuffer inputBuffer = null;


    /**
     * Output.
     */
    protected InternalNioOutputBuffer outputBuffer = null;


    /**
     * Error flag.
     */
    protected boolean error = false;


    /**
     * Keep-alive.
     */
    protected boolean keepAlive = true;


    /**
     * HTTP/1.1 flag.
     */
    protected boolean http11 = true;


    /**
     * HTTP/0.9 flag.
     */
    protected boolean http09 = false;



    /**
     * Comet used.
     */
    protected boolean comet = false;


    /**
     * Content delimitator for the request (if false, the connection will
     * be closed at the end of the request).
     */
    protected boolean contentDelimitation = true;


    /**
     * Is there an expectation ?
     */
    protected boolean expectation = false;


    /**
    protected Pattern[] restrictedUserAgents = null;


    /**
     * Maximum number of Keep-Alive requests to honor.
     */
    protected int maxKeepAliveRequests = -1;


    /**
     * SSL enabled ?
     */
    protected boolean ssl = false;


    /**
     * Socket associated with the current connection.
     */
    protected SocketChannel socket = null;


    /**
     * Remote Address associated with the current connection.
     */
    protected String remoteAddr = null;


    /**
     * Remote Host associated with the current connection.
     */
    protected String remoteHost = null;


    /**
     * Local Host associated with the current connection.
     */
    protected String localName = null;



    /**
     * Local port to which the socket is connected
     */
    protected int localPort = -1;


    /**
     * Remote port to which the socket is connected
     */
    protected int remotePort = -1;


    /**
     * The local Host address.
     */
    protected String localAddr = null;


    /**
     * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
     */
    protected int timeout = 300000;


    /**
     * Flag to disable setting a different time-out on uploads.
     */
    protected boolean disableUploadTimeout = false;
        }

    /**
     * Allowed compression level.
     */
    protected int compressionLevel = 0;


    /**
     * Minimum contentsize to make compression.
     */
    protected int compressionMinSize = 2048;


    /**
     * Socket buffering.
     */
    protected int socketBuffer = -1;


    /**
     * Max save post size.
     */
    protected int maxSavePostSize = 4 * 1024;


    /**
     * List of user agents to not use gzip with
     */
    protected Pattern noCompressionUserAgents[] = null;

    /**
     * List of MIMES which could be gzipped
     */
    protected String[] compressableMimeTypes =
    { "text/html", "text/xml", "text/plain" };


    /**
     * Host name (used to avoid useless B2C conversion on the host name).
     */
    protected char[] hostNameC = new char[0];


    /**
     * Associated endpoint.
     */
    protected NioEndpoint endpoint;


    /**
     * Allow a customized the server header for the tin-foil hat folks.
     */
    protected String server = null;


    // ------------------------------------------------------------- Properties


    /**
     * Return compression level.
     */
    public String getCompression() {
        switch (compressionLevel) {
        case 0:
            return "off";
        case 1:
            return "on";
        case 2:
            return "force";
        }
        return "off";
    }


    /**
     * Set compression level.
     */
    public void setCompression(String compression) {
        if (compression.equals("on")) {
            this.compressionLevel = 1;
        } else if (compression.equals("force")) {
            this.compressionLevel = 2;
        } else if (compression.equals("off")) {
            this.compressionLevel = 0;
        } else {
            try {
                // Try to parse compression as an int, which would give the
                // minimum compression size
                compressionMinSize = Integer.parseInt(compression);
                this.compressionLevel = 1;
            } catch (Exception e) {
                this.compressionLevel = 0;
            }
        }
    }

    /**
     * Set Minimum size to trigger compression.
     */
    public void setCompressionMinSize(int compressionMinSize) {
        this.compressionMinSize = compressionMinSize;
    }


    /**
     * Add user-agent for which gzip compression didn't works
     * The user agent String given will be exactly matched
     * to the user-agent header submitted by the client.
     *
     * @param userAgent user-agent string
     */
    public void addNoCompressionUserAgent(String userAgent) {
        try {
            Pattern nRule = Pattern.compile(userAgent);
            noCompressionUserAgents =
                addREArray(noCompressionUserAgents, nRule);
        } catch (PatternSyntaxException pse) {
            log.error(sm.getString("http11processor.regexp.error", userAgent), pse);
        }
    }


    /**
     * Set no compression user agent list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setNoCompressionUserAgents(Pattern[] noCompressionUserAgents) {
        this.noCompressionUserAgents = noCompressionUserAgents;
    }


    /**
     * Set no compression user agent list.
     * List contains users agents separated by ',' :
     *
     * ie: "gorilla,desesplorer,tigrus"
     */
    public void setNoCompressionUserAgents(String noCompressionUserAgents) {
        if (noCompressionUserAgents != null) {
            StringTokenizer st = new StringTokenizer(noCompressionUserAgents, ",");

            while (st.hasMoreTokens()) {
                addNoCompressionUserAgent(st.nextToken().trim());
            }
        }
    }

    /**
     * Add a mime-type which will be compressable
     * The mime-type String will be exactly matched
     * in the response mime-type header .
     *
     * @param mimeType mime-type string
     */
    public void addCompressableMimeType(String mimeType) {
        compressableMimeTypes =
            addStringArray(compressableMimeTypes, mimeType);
    }


    /**
     * Set compressable mime-type list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setCompressableMimeTypes(String[] compressableMimeTypes) {
        this.compressableMimeTypes = compressableMimeTypes;
    }


    /**
     * Set compressable mime-type list
     * List contains users agents separated by ',' :
     *
     * ie: "text/html,text/xml,text/plain"
     */
    public void setCompressableMimeTypes(String compressableMimeTypes) {
        if (compressableMimeTypes != null) {
            StringTokenizer st = new StringTokenizer(compressableMimeTypes, ",");

            while (st.hasMoreTokens()) {
                addCompressableMimeType(st.nextToken().trim());
            }
        }
    }


    /**
     * Return the list of restricted user agents.
     */
    public String[] findCompressableMimeTypes() {
        return (compressableMimeTypes);
    }



    // --------------------------------------------------------- Public Methods


    /**
     * Add input or output filter.
     *
     * @param className class name of the filter
     */
    protected void addFilter(String className) {
        try {
            Class clazz = Class.forName(className);
            Object obj = clazz.newInstance();
            if (obj instanceof InputFilter) {
                inputBuffer.addFilter((InputFilter) obj);
            } else if (obj instanceof OutputFilter) {
                outputBuffer.addFilter((OutputFilter) obj);
            } else {
                log.warn(sm.getString("http11processor.filter.unknown", className));
            }
        } catch (Exception e) {
            log.error(sm.getString("http11processor.filter.error", className), e);
     * Set restricted user agent list (which will downgrade the connector
    }
    }


    /**
     * General use method
     *
     * @param sArray the StringArray
     * @param value string
     */
    private String[] addStringArray(String sArray[], String value) {
        String[] result = null;
        if (sArray == null) {
            result = new String[1];
            result[0] = value;
        }
        else {
            result = new String[sArray.length + 1];
            for (int i = 0; i < sArray.length; i++)
                result[i] = sArray[i];
            result[sArray.length] = value;
        }
        return result;
    }


    /**
     * General use method
     *
     * @param rArray the REArray
     * @param value Obj
     */
    private Pattern[] addREArray(Pattern rArray[], Pattern value) {
        Pattern[] result = null;
        if (rArray == null) {
            result = new Pattern[1];
            result[0] = value;
        }
        else {
            result = new Pattern[rArray.length + 1];
            for (int i = 0; i < rArray.length; i++)
                result[i] = rArray[i];
            result[rArray.length] = value;
        }
        return result;
    }


    /**
     * General use method
     *
     * @param sArray the StringArray
     * @param value string
     */
    private boolean inStringArray(String sArray[], String value) {
        for (int i = 0; i < sArray.length; i++) {
            if (sArray[i].equals(value)) {
                return true;
            }
        }
        return false;
    }


    /**
     * Checks if any entry in the string array starts with the specified value
     *
     * @param sArray the StringArray
     * @param value string
     */
    private boolean startsWithStringArray(String sArray[], String value) {
        if (value == null)
           return false;
        for (int i = 0; i < sArray.length; i++) {
            if (value.startsWith(sArray[i])) {
                return true;
            }
        }
        return false;
    }


    /**
     * Add restricted user-agent (which will downgrade the connector
     * to HTTP/1.0 mode). The user agent String given will be matched
     * via regexp to the user-agent header submitted by the client.
     *
     * @param userAgent user-agent string
     */
    public void addRestrictedUserAgent(String userAgent) {
        try {
            Pattern nRule = Pattern.compile(userAgent);
            restrictedUserAgents = addREArray(restrictedUserAgents, nRule);
        } catch (PatternSyntaxException pse) {
            log.error(sm.getString("http11processor.regexp.error", userAgent), pse);
        }
    }


    /**
     * Set restricted user agent list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setRestrictedUserAgents(Pattern[] restrictedUserAgents) {
        this.restrictedUserAgents = restrictedUserAgents;
    }


    /**
     * Set the maximum size of a POST which will be buffered in SSL mode.
     */
     * to HTTP/1.0 mode). List contains users agents separated by ',' :
     *
     * ie: "gorilla,desesplorer,tigrus"
     */
    public void setRestrictedUserAgents(String restrictedUserAgents) {
        if (restrictedUserAgents != null) {
            StringTokenizer st =
                new StringTokenizer(restrictedUserAgents, ",");
            while (st.hasMoreTokens()) {
                addRestrictedUserAgent(st.nextToken().trim());
            }
        }
    }


    /**
     * Return the list of restricted user agents.
     */
    public String[] findRestrictedUserAgents() {
        String[] sarr = new String [restrictedUserAgents.length];

        for (int i = 0; i < restrictedUserAgents.length; i++)
            sarr[i] = restrictedUserAgents[i].toString();

        return (sarr);
    }


    /**
     * Set the maximum number of Keep-Alive requests to honor.
     * This is to safeguard from DoS attacks.  Setting to a negative
     * value disables the check.
     */
    public void setMaxKeepAliveRequests(int mkar) {
        maxKeepAliveRequests = mkar;
    }


    /**
     * Return the number of Keep-Alive requests that we will honor.
     */
    public int getMaxKeepAliveRequests() {
        return maxKeepAliveRequests;
    }


    /**
    public void setMaxSavePostSize(int msps) {
        maxSavePostSize = msps;
    }


    /**
     * Return the maximum size of a POST which will be buffered in SSL mode.
     */
    public int getMaxSavePostSize() {
        return maxSavePostSize;
    }


    /**
     * Set the flag to control upload time-outs.
     */
    public void setDisableUploadTimeout(boolean isDisabled) {
        disableUploadTimeout = isDisabled;
    }

    /**
     * Get the flag that controls upload time-outs.
     */
    public boolean getDisableUploadTimeout() {
        return disableUploadTimeout;
    }

    /**
     * Set the socket buffer flag.
     */
    public void setSocketBuffer(int socketBuffer) {
        this.socketBuffer = socketBuffer;
        outputBuffer.setSocketBuffer(socketBuffer);
    }

    /**
     * Get the socket buffer flag.
     */
    public int getSocketBuffer() {
        return socketBuffer;
    }

    /**
     * Set the upload timeout.
     */
    public void setTimeout( int timeouts ) {
        timeout = timeouts ;
    }

    /**
     * Get the upload timeout.
     */
    public int getTimeout() {
        return timeout;
    }


    /**
     * Set the server header name.
     */
    public void setServer( String server ) {
        if (server==null || server.equals("")) {
            this.server = null;
        } else {
            this.server = server;
        }

    /**
     * Get the server header name.
     */
    public String getServer() {
        return server;
    }


    /** Get the request associated with this processor.
     *
     * @return The request
     */
    public Request getRequest() {
        return request;
    }

    /**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     *
     * @throws IOException error during an I/O operation
     */
    public SocketState event(boolean error)
        throws IOException {

        RequestInfo rp = request.getRequestProcessor();

        try {
            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
            error = !adapter.event(request, response, error);
            if (request.getAttribute("org.apache.tomcat.comet") == null) {
                comet = false;
            }
            SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
            if ( key != null ) {
                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                if ( attach!=null ) {
                    attach.setComet(comet);
                    Integer comettimeout = (Integer)request.getAttribute("org.apache.tomcat.comet.timeout");
                    if ( comettimeout != null ) attach.setTimeout(comettimeout.longValue());
                }
            }
            
        } catch (InterruptedIOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.request.process"), t);
            // 500 - Internal Server Error
            response.setStatus(500);
            error = true;
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (error) {
            recycle();
            return SocketState.CLOSED;
        } else if (!comet) {
            recycle();
            endpoint.getPoller().add(socket);
            return SocketState.OPEN;
        } else {
            endpoint.getCometPoller().add(socket);
            return SocketState.LONG;
        }
    }

    /**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     *
     * @throws IOException error during an I/O operation
     */
    public SocketState process(SocketChannel socket)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Set the remote address
        remoteAddr = null;
        remoteHost = null;
        localAddr = null;
        localName = null;
        remotePort = -1;
        localPort = -1;

        // Setting up the socket
        this.socket = socket;
        inputBuffer.setSocket(socket);
        outputBuffer.setSocket(socket);
        outputBuffer.setSelector(endpoint.getPoller().getSelector());

        // Error flag
        error = false;
        keepAlive = true;

        int keepAliveLeft = maxKeepAliveRequests;
        long soTimeout = endpoint.getSoTimeout();

        int limit = 0;
        if (endpoint.getFirstReadTimeout() > 0 || endpoint.getFirstReadTimeout() < -1) {
            limit = endpoint.getMaxThreads() / 2;
        }
                } catch (Throwable t) {

        boolean keptAlive = false;
        boolean openSocket = false;

        while (!error && keepAlive && !comet) {

            // Parsing the request header
            try {
                if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                    socket.socket().setSoTimeout((int)soTimeout);
                    inputBuffer.readTimeout = soTimeout;
                }
                if (!inputBuffer.parseRequestLine
                        (keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) {
                    // This means that no data is available right now
                    // (long keepalive), so that the processor should be recycled
                    // and the method should return true
                    openSocket = true;
                    // Add the socket to the poller
                    endpoint.getPoller().add(socket);
                    break;
                }
                request.setStartTime(System.currentTimeMillis());
                keptAlive = true;
                if (!disableUploadTimeout) {
                    socket.socket().setSoTimeout((int)timeout);
                    inputBuffer.readTimeout = soTimeout;
                }
                inputBuffer.parseHeaders();
            } catch (IOException e) {
                error = true;
                break;
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), t);
                }
                // 400 - Bad Request
                response.setStatus(400);
                error = true;
            }

            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.request.prepare"), t);
                }
                // 400 - Internal Server Error
                response.setStatus(400);
                error = true;
            }

            if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0)
                keepAlive = false;

            // Process the request in the adapter
            if (!error) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                statusDropsConnection(response.getStatus());
                    }
                    // Comet support
                    if (request.getAttribute("org.apache.tomcat.comet") != null) {
                        comet = true;
                    }
                    SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
                    if (key != null) {
                        NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                        if (attach != null)  {
                            attach.setComet(comet);
                            Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
                            if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
                        }
                    }
                } catch (InterruptedIOException e) {
                    error = true;
        } catch (Throwable t) {

                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    error = true;
                }
            }

            // Finish the handling of the request
            if (!comet) {
                endRequest();
            }

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (error) {
                response.setStatus(500);
            }
            request.updateCounters();

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (comet) {
            if (error) {
                recycle();
                return SocketState.CLOSED;
            } else {
                return SocketState.LONG;
            }
        } else {
            recycle();
            return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
        }

    }


    public void endRequest() {

        // Finish the handling of the request
        try {
            inputBuffer.endRequest();
        } catch (IOException e) {
            error = true;
            log.error(sm.getString("http11processor.request.finish"), t);
            // 500 - Internal Server Error
            response.setStatus(500);
            error = true;
        }
        try {
            outputBuffer.endRequest();
        } catch (IOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.response.finish"), t);
            error = true;
        }

        // Next request
        inputBuffer.nextRequest();
        outputBuffer.nextRequest();

    }


    public void recycle() {
        inputBuffer.recycle();
        outputBuffer.recycle();
        this.socket = null;
    }


    // ----------------------------------------------------- ActionHook Methods


    /**
     * Send an action to the connector.
     *
     * @param actionCode Type of the action
     * @param param Action parameter
     */
    public void action(ActionCode actionCode, Object param) {

        if (actionCode == ActionCode.ACTION_COMMIT) {
            // Commit current response

            if (response.isCommitted())
                return;

            // Validate and write response headers
            prepareResponse();
            try {
                outputBuffer.commit();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_ACK) {

            // Acknowlege request

            // Send a 100 status back if it makes sense (response not committed
            // yet, and client specified an expectation for 100-continue)
            if ((response.isCommitted()) || !expectation)
                return;

            inputBuffer.setSwallowInput(true);
            try {
                outputBuffer.sendAck();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {

            try {
                outputBuffer.flush();
            } catch (IOException e) {
                // Set error flag
                error = true;
                response.setErrorException(e);
            }

        } else if (actionCode == ActionCode.ACTION_CLOSE) {
            // Close

            // End the processing of the current request, and stop any further
            // transactions with the client

            comet = false;
            try {
                outputBuffer.endRequest();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_RESET) {

            // Reset response

            // Note: This must be called before the response is committed

            outputBuffer.reset();

        } else if (actionCode == ActionCode.ACTION_CUSTOM) {

            // Do nothing

        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {

            // Get remote host address
            if ((remoteAddr == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getInetAddress();
                if (inetAddr != null) {
                    remoteAddr = inetAddr.getHostAddress();
                }
            }
            request.remoteAddr().setString(remoteAddr);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) {

            // Get local host name
            if ((localName == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getLocalAddress();
                if (inetAddr != null) {
                    localName = inetAddr.getHostName();
                }
            }
            request.localName().setString(localName);

        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) {

            // Get remote host name
            if ((remoteHost == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getInetAddress();
                if (inetAddr != null) {
                    remoteHost = inetAddr.getHostName();
                }
                if(remoteHost == null) {
                    if(remoteAddr != null) {
                        remoteHost = remoteAddr;
                    } else { // all we can do is punt
                        request.remoteHost().recycle();
                    }
                }
            }
            request.remoteHost().setString(remoteHost);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) {

            if (localAddr == null)
               localAddr = socket.socket().getLocalAddress().getHostAddress();

            request.localAddr().setString(localAddr);

        } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) {

            if ((remotePort == -1 ) && (socket !=null)) {
                remotePort = socket.socket().getPort();
            }
//                    if (sslO != null) {
        return false;
        }
//                            (NioEndpoint.CERTIFICATE_KEY, certs);
//                    }
//                } catch (Exception e) {
            request.setRemotePort(remotePort);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) {

            if ((localPort == -1 ) && (socket !=null)) {
                localPort = socket.socket().getLocalPort();
            }
            request.setLocalPort(localPort);

        } else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {

//            if (ssl && (socket != 0)) {
//                try {
//                    // Cipher suite
//                    Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER);
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.CIPHER_SUITE_KEY, sslO);
//                    }
//                    // Client certificate chain if present
//                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
//                    X509Certificate[] certs = null;
//                    if (certLength > 0) {
//                        certs = new X509Certificate[certLength];
//                        for (int i = 0; i < certLength; i++) {
//                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
//                            CertificateFactory cf =
//                                CertificateFactory.getInstance("X.509");
//                            ByteArrayInputStream stream = new ByteArrayInputStream(data);
//                            certs[i] = (X509Certificate) cf.generateCertificate(stream);
//                        }
//                    }
//                    if (certs != null) {
//                        request.setAttribute
//                            (NioEndpoint.CERTIFICATE_KEY, certs);
//                    }
//                    // User key size
//                    sslO = new Integer(SSLSocket.getInfoI(socket, SSL.SSL_INFO_CIPHER_USEKEYSIZE));
//                        request.setAttribute
//                            (NioEndpoint.KEY_SIZE_KEY, sslO);
//                    }
//                    // SSL session ID
//                    sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID);
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.SESSION_ID_KEY, sslO);
//                    }
//                } catch (Exception e) {
//                    log.warn(sm.getString("http11processor.socket.ssl"), e);
//                }
//            }

        } else if (actionCode == ActionCode.ACTION_REQ_SSL_CERTIFICATE) {

//            if (ssl && (socket != 0)) {
//                 // Consume and buffer the request body, so that it does not
//                 // interfere with the client's handshake messages
//                InputFilter[] inputFilters = inputBuffer.getFilters();
//                ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER])
//                    .setLimit(maxSavePostSize);
//                inputBuffer.addActiveFilter
//                    (inputFilters[Constants.BUFFERED_FILTER]);
//                try {
//                    // Renegociate certificates
//                    SSLSocket.renegotiate(socket);
//                    // Client certificate chain if present
//                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
//                    X509Certificate[] certs = null;
//                    if (certLength > 0) {
//                        certs = new X509Certificate[certLength];
//                        for (int i = 0; i < certLength; i++) {
//                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
//                            CertificateFactory cf =
//                                CertificateFactory.getInstance("X.509");
//                            ByteArrayInputStream stream = new ByteArrayInputStream(data);
//                            certs[i] = (X509Certificate) cf.generateCertificate(stream);
//                        }
//                    }
//                    if (certs != null) {
//                        request.setAttribute

    protected void prepareRequest() {

        http11 = true;
        http09 = false;
        contentDelimitation = false;
        expectation = false;
        if (ssl) {
//                    log.warn(sm.getString("http11processor.socket.ssl"), e);
//                }
//            }

        } else if (actionCode == ActionCode.ACTION_REQ_SET_BODY_REPLAY) {
            ByteChunk body = (ByteChunk) param;

            InputFilter savedBody = new SavedRequestInputFilter(body);
            savedBody.setRequest(request);

            InternalNioInputBuffer internalBuffer = (InternalNioInputBuffer)
                request.getInputBuffer();
            internalBuffer.addActiveFilter(savedBody);
        }

    }


    // ------------------------------------------------------ Connector Methods


    /**
     * Set the associated adapter.
     *
     * @param adapter the new adapter
     */
    public void setAdapter(Adapter adapter) {
        this.adapter = adapter;
    }


    /**
     * Get the associated adapter.
     *
     * @return the associated adapter
     */
    public Adapter getAdapter() {
        return adapter;
    }


    // ------------------------------------------------------ Protected Methods


    /**
     * After reading the request headers, we have to setup the request filters.
     */
            request.scheme().setString("https");
        }
        MessageBytes protocolMB = request.protocol();
        if (protocolMB.equals(Constants.HTTP_11)) {
            http11 = true;
            protocolMB.setString(Constants.HTTP_11);
        } else if (protocolMB.equals(Constants.HTTP_10)) {
            http11 = false;
            keepAlive = false;
            protocolMB.setString(Constants.HTTP_10);
        } else if (protocolMB.equals("")) {
            // HTTP/0.9
            http09 = true;
            http11 = false;
            keepAlive = false;
        } else {
            // Unsupported protocol
            http11 = false;
            error = true;
            // Send 505; Unsupported HTTP version
            response.setStatus(505);
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals(Constants.GET)) {
            methodMB.setString(Constants.GET);
        } else if (methodMB.equals(Constants.POST)) {
            methodMB.setString(Constants.POST);
        }

        MimeHeaders headers = request.getMimeHeaders();

        // Check connection header
        MessageBytes connectionValueMB = headers.getValue("connection");
        if (connectionValueMB != null) {
            ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
            if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
                keepAlive = false;
            } else if (findBytes(connectionValueBC,
                                 Constants.KEEPALIVE_BYTES) != -1) {
                keepAlive = true;
            }
        }

        MessageBytes expectMB = null;
        if (http11)
            expectMB = headers.getValue("expect");
        if ((expectMB != null)
            && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) {
            inputBuffer.setSwallowInput(false);
            expectation = true;
        }

        // Check user-agent header
        if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) {

            MessageBytes userAgentValueMB = headers.getValue("user-agent");
            // Check in the restricted list, and adjust the http11
            // and keepAlive flags accordingly
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();
                for (int i = 0; i < restrictedUserAgents.length; i++) {
                    if (restrictedUserAgents[i].matcher(userAgentValue).matches()) {
                        http11 = false;
                        keepAlive = false;
                        break;
                    }
                }
            }
        }

        // Check for a full URI (including protocol://host:port/)
        ByteChunk uriBC = request.requestURI().getByteChunk();
        if (uriBC.startsWithIgnoreCase("http", 0)) {

            int pos = uriBC.indexOf("://", 0, 3, 4);
            int uriBCStart = uriBC.getStart();
            int slashPos = -1;
            if (pos != -1) {
                byte[] uriB = uriBC.getBytes();
                slashPos = uriBC.indexOf('/', pos + 3);
                if (slashPos == -1) {
                    slashPos = uriBC.getLength();
                    // Set URI as "/"
                    request.requestURI().setBytes
                        (uriB, uriBCStart + pos + 1, 1);
                } else {
                    request.requestURI().setBytes
                        (uriB, uriBCStart + slashPos,
                         uriBC.getLength() - slashPos);
                }
                MessageBytes hostMB = headers.setValue("host");
                hostMB.setBytes(uriB, uriBCStart + pos + 3,
                                slashPos - pos - 3);
            }

        }

        // Input filter setup
        InputFilter[] inputFilters = inputBuffer.getFilters();
        // Parse transfer-encoding header
        MessageBytes transferEncodingValueMB = null;
        if (http11)
            transferEncodingValueMB = headers.getValue("transfer-encoding");
        if (transferEncodingValueMB != null) {
            String transferEncodingValue = transferEncodingValueMB.toString();
            // Parse the comma separated list. "identity" codings are ignored
            int startPos = 0;
            int commaPos = transferEncodingValue.indexOf(',');
            String encodingName = null;
            while (commaPos != -1) {
                encodingName = transferEncodingValue.substring
                    (startPos, commaPos).toLowerCase().trim();
                if (!addInputFilter(inputFilters, encodingName)) {
                    // Unsupported transfer encoding
                    error = true;
                    // 501 - Unimplemented
                    response.setStatus(501);
                }
                startPos = commaPos + 1;
                commaPos = transferEncodingValue.indexOf(',', startPos);
            }
            encodingName = transferEncodingValue.substring(startPos)
                .toLowerCase().trim();
            if (!addInputFilter(inputFilters, encodingName)) {
                // Unsupported transfer encoding
                error = true;
                // 501 - Unimplemented
                response.setStatus(501);
            }
        }

        // Parse content-length header
        long contentLength = request.getContentLengthLong();
        if (contentLength >= 0 && !contentDelimitation) {
            inputBuffer.addActiveFilter
                (inputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        }

        MessageBytes valueMB = headers.getValue("host");

        // Check host header
        if (http11 && (valueMB == null)) {
            error = true;
            // 400 - Bad request
            response.setStatus(400);
        parseHost(valueMB);

        if (!contentDelimitation) {
            // If there's no content length 
            // (broken HTTP/1.0 or HTTP/1.1), assume
            // the client is not broken and didn't send a body
            inputBuffer.addActiveFilter
                    (inputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }

        // Advertise comet support through a request attribute
        request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
        // Advertise comet timeout support
        request.setAttribute("org.apache.tomcat.comet.timeout.support", Boolean.TRUE);

    }


    /**
     * Parse host.
     */
    public void parseHost(MessageBytes valueMB) {

        if (valueMB == null || valueMB.isNull()) {
            // HTTP/1.0
            // Default is what the socket tells us. Overriden if a host is
            // found/parsed
            request.setServerPort(endpoint.getPort());
            return;
        }

        ByteChunk valueBC = valueMB.getByteChunk();
        byte[] valueB = valueBC.getBytes();
        int valueL = valueBC.getLength();
        int valueS = valueBC.getStart();
        int colonPos = -1;
        if (hostNameC.length < valueL) {
            hostNameC = new char[valueL];
        }

        boolean ipv6 = (valueB[valueS] == '[');
        boolean bracketClosed = false;
        for (int i = 0; i < valueL; i++) {
            char b = (char) valueB[i + valueS];
            hostNameC[i] = b;
            if (b == ']') {
                bracketClosed = true;
            } else if (b == ':') {
                if (!ipv6 || bracketClosed) {
    }
                    colonPos = i;
                    break;
                }
            }
        }

        if (colonPos < 0) {
            if (!ssl) {
                // 80 - Default HTTP port
                request.setServerPort(80);
            } else {
                // 443 - Default HTTPS port
                request.setServerPort(443);
            }
            request.serverName().setChars(hostNameC, 0, valueL);
        } else {

            request.serverName().setChars(hostNameC, 0, colonPos);

            int port = 0;
            int mult = 1;
            for (int i = valueL - 1; i > colonPos; i--) {
                int charValue = HexUtils.DEC[(int) valueB[i + valueS]];
                if (charValue == -1) {
                    // Invalid character
                    error = true;
                    // 400 - Bad request
                    response.setStatus(400);
                    break;
                }
                port = port + (charValue * mult);
                mult = 10 * mult;
            }
            request.setServerPort(port);

        }

    }


    /**
     * Check for compression
     */
    private boolean isCompressable() {

        // Nope Compression could works in HTTP 1.0 also
        // cf: mod_deflate

        // Compression only since HTTP 1.1
        // if (! http11)
        //    return false;

        // Check if browser support gzip encoding
        MessageBytes acceptEncodingMB =


    /**
     * When committing the response, we have to validate the set of headers, as
            request.getMimeHeaders().getValue("accept-encoding");

        if ((acceptEncodingMB == null)
            || (acceptEncodingMB.indexOf("gzip") == -1))
            return false;

        // Check if content is not allready gzipped
        MessageBytes contentEncodingMB =
            response.getMimeHeaders().getValue("Content-Encoding");

        if ((contentEncodingMB != null)
            && (contentEncodingMB.indexOf("gzip") != -1))
            return false;

        // If force mode, allways compress (test purposes only)
        if (compressionLevel == 2)
           return true;

        // Check for incompatible Browser
        if (noCompressionUserAgents != null) {
            MessageBytes userAgentValueMB =
                request.getMimeHeaders().getValue("user-agent");
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();

                // If one Regexp rule match, disable compression
                for (int i = 0; i < noCompressionUserAgents.length; i++)
                    if (noCompressionUserAgents[i].matcher(userAgentValue).matches())
                        return false;
            }
        }

        // Check if suffisant len to trig the compression
        long contentLength = response.getContentLengthLong();
        if ((contentLength == -1)
            || (contentLength > compressionMinSize)) {
            // Check for compatible MIME-TYPE
            if (compressableMimeTypes != null) {
                return (startsWithStringArray(compressableMimeTypes,
                                              response.getContentType()));
            }
        }

     * well as setup the response filters.
     */
    protected void prepareResponse() {

        boolean entityBody = true;
        contentDelimitation = false;

        OutputFilter[] outputFilters = outputBuffer.getFilters();

        if (http09 == true) {
            // HTTP/0.9
            outputBuffer.addActiveFilter
                (outputFilters[Constants.IDENTITY_FILTER]);
            return;
        }

        int statusCode = response.getStatus();
        if ((statusCode == 204) || (statusCode == 205)
            || (statusCode == 304)) {
            // No entity body
            outputBuffer.addActiveFilter
                (outputFilters[Constants.VOID_FILTER]);
            entityBody = false;
            contentDelimitation = true;
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals("HEAD")) {
            // No entity body
            outputBuffer.addActiveFilter
                (outputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }


        // Check for compression
        boolean useCompression = false;
        if (entityBody && (compressionLevel > 0)) {
            useCompression = isCompressable();
            // Change content-length to -1 to force chunking
            if (useCompression) {
                response.setContentLength(-1);
            }
        }

        MimeHeaders headers = response.getMimeHeaders();
        if (!entityBody) {
            response.setContentLength(-1);
        } else {
            String contentType = response.getContentType();
        outputBuffer.addFilter(new GzipOutputFilter());
            if (contentType != null) {
                headers.setValue("Content-Type").setString(contentType);
            }
            String contentLanguage = response.getContentLanguage();
            if (contentLanguage != null) {
                headers.setValue("Content-Language")
                    .setString(contentLanguage);
            }
        }

        long contentLength = response.getContentLengthLong();
        if (contentLength != -1) {
            headers.setValue("Content-Length").setLong(contentLength);
            outputBuffer.addActiveFilter
                (outputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        } else {
            if (entityBody && http11 && keepAlive) {
                outputBuffer.addActiveFilter
                    (outputFilters[Constants.CHUNKED_FILTER]);
                contentDelimitation = true;
                headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
            } else {
                outputBuffer.addActiveFilter
                    (outputFilters[Constants.IDENTITY_FILTER]);
            }
        }

        if (useCompression) {
            outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
            headers.setValue("Content-Encoding").setString("gzip");
            // Make Proxies happy via Vary (from mod_deflate)
            headers.setValue("Vary").setString("Accept-Encoding");
        }

        // Add date header
        headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate());

        // FIXME: Add transfer encoding header

        if ((entityBody) && (!contentDelimitation)) {
            // Mark as close the connection after the request, and add the
            // connection: close header
            keepAlive = false;
        }

        // If we know that the request is bad this early, add the
        // Connection: close header.
        keepAlive = keepAlive && !statusDropsConnection(statusCode);
        if (!keepAlive) {
            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
        } else if (!http11 && !error) {
            headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
        }

        // Build the response header
        outputBuffer.sendStatus();

        // Add server header
        if (server != null) {
            headers.setValue("Server").setString(server);
        } else {
            outputBuffer.write(Constants.SERVER_BYTES);
        }

        int size = headers.size();
        for (int i = 0; i < size; i++) {
            outputBuffer.sendHeader(headers.getName(i), headers.getValue(i));
        }
        outputBuffer.endHeaders();

    }


    /**
     * Initialize standard input and output filters.
     */
    protected void initializeFilters() {

        // Create and add the identity filters.
        inputBuffer.addFilter(new IdentityInputFilter());
        outputBuffer.addFilter(new IdentityOutputFilter());

        // Create and add the chunked filters.
        inputBuffer.addFilter(new ChunkedInputFilter());
        outputBuffer.addFilter(new ChunkedOutputFilter());

        // Create and add the void filters.
        inputBuffer.addFilter(new VoidInputFilter());
        outputBuffer.addFilter(new VoidOutputFilter());

        // Create and add buffered input filter
        inputBuffer.addFilter(new BufferedInputFilter());

        // Create and add the chunked filters.
        //inputBuffer.addFilter(new GzipInputFilter());
     */
    }

    }


    /**
     * Add an input filter to the current request.
     *
     * @return false if the encoding was not found (which would mean it is
     * unsupported)
     */
    protected boolean addInputFilter(InputFilter[] inputFilters,
                                     String encodingName) {
        if (encodingName.equals("identity")) {
            // Skip
        } else if (encodingName.equals("chunked")) {
            inputBuffer.addActiveFilter
                (inputFilters[Constants.CHUNKED_FILTER]);
            contentDelimitation = true;
        } else {
            for (int i = 2; i < inputFilters.length; i++) {
                if (inputFilters[i].getEncodingName()
                    .toString().equals(encodingName)) {
                    inputBuffer.addActiveFilter(inputFilters[i]);
                    return true;
                }
            }
            return false;
        }
        return true;
    }


    /**
     * Specialized utility method: find a sequence of lower case bytes inside
     * a ByteChunk.
     */
    protected int findBytes(ByteChunk bc, byte[] b) {

        byte first = b[0];
        byte[] buff = bc.getBuffer();
        int start = bc.getStart();
        int end = bc.getEnd();

    // Look for first char
    int srcEnd = b.length;

    for (int i = start; i <= (end - srcEnd); i++) {
        if (Ascii.toLower(buff[i]) != first) continue;
        // found first char, now look for a match
            int myPos = i+1;
        for (int srcPos = 1; srcPos < srcEnd; ) {
                if (Ascii.toLower(buff[myPos++]) != b[srcPos++])
            break;
                if (srcPos == srcEnd) return i - start; // found it
        }
    }
    return -1;

    }

    /**
     * Determine if we must drop the connection because of the HTTP status
     * code.  Use the same list of codes as Apache/httpd.
     */
    protected boolean statusDropsConnection(int status) {
        return status == 400 /* SC_BAD_REQUEST */ ||
               status == 408 /* SC_REQUEST_TIMEOUT */ ||
               status == 411 /* SC_LENGTH_REQUIRED */ ||
               status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
               status == 414 /* SC_REQUEST_URI_TOO_LARGE */ ||
               status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
               status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
               status == 501 /* SC_NOT_IMPLEMENTED */;
    }

}
=======
/*
 *  Copyright 1999-2004 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.coyote.http11;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.coyote.Adapter;
import org.apache.coyote.Request;
import org.apache.coyote.RequestInfo;
import org.apache.coyote.Response;
import org.apache.coyote.http11.filters.BufferedInputFilter;
import org.apache.coyote.http11.filters.ChunkedInputFilter;
import org.apache.coyote.http11.filters.ChunkedOutputFilter;
import org.apache.coyote.http11.filters.GzipOutputFilter;
import org.apache.coyote.http11.filters.IdentityInputFilter;
import org.apache.coyote.http11.filters.IdentityOutputFilter;
import org.apache.coyote.http11.filters.SavedRequestInputFilter;
import org.apache.coyote.http11.filters.VoidInputFilter;
import org.apache.coyote.http11.filters.VoidOutputFilter;
import org.apache.tomcat.util.buf.Ascii;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioEndpoint.Handler;
import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
import java.nio.channels.SelectionKey;


/**
 * Processes HTTP requests.
 *
 * @author Remy Maucherat
 * @author Filip Hanik
 */
public class Http11NioProcessor implements ActionHook {


    /**
     * Logger.
     */
    protected static org.apache.commons.logging.Log log
        = org.apache.commons.logging.LogFactory.getLog(Http11NioProcessor.class);

    /**
     * The string manager for this package.
    protected static StringManager sm =
        StringManager.getManager(Constants.Package);


    // ----------------------------------------------------------- Constructors


    public Http11NioProcessor(int headerBufferSize, NioEndpoint endpoint) {

        this.endpoint = endpoint;

        request = new Request();
        int readTimeout = endpoint.getFirstReadTimeout();
        if (readTimeout == 0) {
            readTimeout = 100;
        } else if (readTimeout < 0) {
            readTimeout = timeout;
            //readTimeout = -1;
        }
        inputBuffer = new InternalNioInputBuffer(request, headerBufferSize,readTimeout);
        inputBuffer.setPoller(endpoint.getPoller());
        request.setInputBuffer(inputBuffer);

        response = new Response();
        response.setHook(this);
        outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
        response.setOutputBuffer(outputBuffer);
        request.setResponse(response);

        ssl = !"off".equalsIgnoreCase(endpoint.getSSLEngine());

        initializeFilters();

        // Cause loading of HexUtils
        int foo = HexUtils.DEC[0];

        // Cause loading of FastHttpDateFormat
        FastHttpDateFormat.getCurrentDate();

    }


    // ----------------------------------------------------- Instance Variables


    /**
     * Associated adapter.
     */
    protected Adapter adapter = null;


    /**
     * Request object.
     */
    protected Request request = null;
    /**


    /**
     * Response object.
     */
    protected Response response = null;


    /**
     * Input.
     */
    protected InternalNioInputBuffer inputBuffer = null;


    /**
     * Output.
     */
    protected InternalNioOutputBuffer outputBuffer = null;


    /**
     * Error flag.
     */
    protected boolean error = false;


    /**
     * Keep-alive.
     */
    protected boolean keepAlive = true;


    /**
     * HTTP/1.1 flag.
     */
    protected boolean http11 = true;


    /**
     * HTTP/0.9 flag.
     */
    protected boolean http09 = false;



    /**
     * Comet used.
     */
    protected boolean comet = false;
    
    /**
     * Closed flag, a Comet async thread can 
     * signal for this Nio processor to be closed and recycled instead
     * of waiting for a timeout.
     * Closed by HttpServletResponse.getWriter().close()
     */
    protected boolean cometClose = false;

    /**
     * Content delimitator for the request (if false, the connection will
     * be closed at the end of the request).
     */
    protected boolean contentDelimitation = true;


    /**
     * Is there an expectation ?
     */
    protected boolean expectation = false;


    /**
     * List of restricted user agents.
     */
    protected Pattern[] restrictedUserAgents = null;


    /**
     * Maximum number of Keep-Alive requests to honor.
     */
    protected int maxKeepAliveRequests = -1;


    /**
     * SSL enabled ?
     */
    protected boolean ssl = false;


    /**
     * Socket associated with the current connection.
     */
    protected SocketChannel socket = null;


    /**
     * Remote Address associated with the current connection.
     */
    protected String remoteAddr = null;


    /**
     * Remote Host associated with the current connection.
     */
    protected String remoteHost = null;


    /**
     * Local Host associated with the current connection.
     */
    protected String localName = null;



    /**
     * Local port to which the socket is connected
     */
    protected int localPort = -1;


    /**
     * Remote port to which the socket is connected
     */
    protected int remotePort = -1;


        }
     * The local Host address.
     */
    protected String localAddr = null;


    /**
     * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
     */
    protected int timeout = 300000;


    /**
     * Flag to disable setting a different time-out on uploads.
     */
    protected boolean disableUploadTimeout = false;


    /**
     * Allowed compression level.
     */
    protected int compressionLevel = 0;


    /**
     * Minimum contentsize to make compression.
     */
    protected int compressionMinSize = 2048;


    /**
     * Socket buffering.
     */
    protected int socketBuffer = -1;


    /**
     * Max save post size.
     */
    protected int maxSavePostSize = 4 * 1024;


    /**
     * List of user agents to not use gzip with
     */
    protected Pattern noCompressionUserAgents[] = null;

    /**
     * List of MIMES which could be gzipped
     */
    protected String[] compressableMimeTypes =
    { "text/html", "text/xml", "text/plain" };


    /**
     * Host name (used to avoid useless B2C conversion on the host name).
     */
    protected char[] hostNameC = new char[0];


    /**
     * Associated endpoint.
     */
    protected NioEndpoint endpoint;


    /**
     * Allow a customized the server header for the tin-foil hat folks.
     */
    protected String server = null;


    // ------------------------------------------------------------- Properties


    /**
     * Return compression level.
     */
    public String getCompression() {
        switch (compressionLevel) {
        case 0:
            return "off";
        case 1:
            return "on";
        case 2:
            return "force";
        }
        return "off";
    }


    /**
     * Set compression level.
     */
    public void setCompression(String compression) {
        if (compression.equals("on")) {
            this.compressionLevel = 1;
        } else if (compression.equals("force")) {
            this.compressionLevel = 2;
        } else if (compression.equals("off")) {
            this.compressionLevel = 0;
        } else {
            try {
                // Try to parse compression as an int, which would give the
                // minimum compression size
                compressionMinSize = Integer.parseInt(compression);
                this.compressionLevel = 1;
            } catch (Exception e) {
                this.compressionLevel = 0;
            }
        }
    }

    /**
     * Set Minimum size to trigger compression.
     */
    public void setCompressionMinSize(int compressionMinSize) {
        this.compressionMinSize = compressionMinSize;
    }


    /**
     * Add user-agent for which gzip compression didn't works
     * The user agent String given will be exactly matched
     * to the user-agent header submitted by the client.
     *
     * @param userAgent user-agent string
     */
    public void addNoCompressionUserAgent(String userAgent) {
        try {
            Pattern nRule = Pattern.compile(userAgent);
            noCompressionUserAgents =
                addREArray(noCompressionUserAgents, nRule);
        } catch (PatternSyntaxException pse) {
            log.error(sm.getString("http11processor.regexp.error", userAgent), pse);
        }
    }


    /**
     * Set no compression user agent list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setNoCompressionUserAgents(Pattern[] noCompressionUserAgents) {
        this.noCompressionUserAgents = noCompressionUserAgents;
    }


    /**
     * Set no compression user agent list.
     * List contains users agents separated by ',' :
     *
     * ie: "gorilla,desesplorer,tigrus"
     */
    public void setNoCompressionUserAgents(String noCompressionUserAgents) {
        if (noCompressionUserAgents != null) {
            StringTokenizer st = new StringTokenizer(noCompressionUserAgents, ",");

            while (st.hasMoreTokens()) {
                addNoCompressionUserAgent(st.nextToken().trim());
            }
        }
    }

    /**
     * Add a mime-type which will be compressable
     * The mime-type String will be exactly matched
     * in the response mime-type header .
     *
     * @param mimeType mime-type string
     */
    public void addCompressableMimeType(String mimeType) {
        compressableMimeTypes =
            addStringArray(compressableMimeTypes, mimeType);
    }


    /**
     * Set compressable mime-type list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setCompressableMimeTypes(String[] compressableMimeTypes) {
        this.compressableMimeTypes = compressableMimeTypes;
    }


    /**
     * Set compressable mime-type list
     * List contains users agents separated by ',' :
     *
     * ie: "text/html,text/xml,text/plain"
     */
    public void setCompressableMimeTypes(String compressableMimeTypes) {
        if (compressableMimeTypes != null) {
            StringTokenizer st = new StringTokenizer(compressableMimeTypes, ",");

            while (st.hasMoreTokens()) {
                addCompressableMimeType(st.nextToken().trim());
            }
        }
    }


    /**
     * Return the list of restricted user agents.
     */
    public String[] findCompressableMimeTypes() {
        return (compressableMimeTypes);
    }



    // --------------------------------------------------------- Public Methods


    /**
     * Add input or output filter.
     *
     * @param className class name of the filter
     */
    protected void addFilter(String className) {
        try {
    /**
                request.setServerPort(80);
            Class clazz = Class.forName(className);
            Object obj = clazz.newInstance();
            if (obj instanceof InputFilter) {
                inputBuffer.addFilter((InputFilter) obj);
            } else if (obj instanceof OutputFilter) {
                outputBuffer.addFilter((OutputFilter) obj);
            } else {
                log.warn(sm.getString("http11processor.filter.unknown", className));
            }
        } catch (Exception e) {
            log.error(sm.getString("http11processor.filter.error", className), e);
        }
    }


    /**
     * General use method
     *
     * @param sArray the StringArray
     * @param value string
     */
    private String[] addStringArray(String sArray[], String value) {
        String[] result = null;
        if (sArray == null) {
            result = new String[1];
            result[0] = value;
        }
        else {
            result = new String[sArray.length + 1];
            for (int i = 0; i < sArray.length; i++)
                result[i] = sArray[i];
            result[sArray.length] = value;
        }
        return result;
    }


    /**
     * General use method
     *
     * @param rArray the REArray
     * @param value Obj
     */
    private Pattern[] addREArray(Pattern rArray[], Pattern value) {
        Pattern[] result = null;
        if (rArray == null) {
            result = new Pattern[1];
            result[0] = value;
        }
        else {
            result = new Pattern[rArray.length + 1];
            for (int i = 0; i < rArray.length; i++)
                result[i] = rArray[i];
            result[rArray.length] = value;
        }
        return result;
    }


    /**
     * General use method
     *
     * @param sArray the StringArray
     * @param value string
     */
    private boolean inStringArray(String sArray[], String value) {
        for (int i = 0; i < sArray.length; i++) {
            if (sArray[i].equals(value)) {
                return true;
            }
        }
        return false;
    }


    /**
     * Checks if any entry in the string array starts with the specified value
     *
     * @param sArray the StringArray
     * @param value string
     */
    private boolean startsWithStringArray(String sArray[], String value) {
        if (value == null)
           return false;
        for (int i = 0; i < sArray.length; i++) {
            if (value.startsWith(sArray[i])) {
                return true;
            }
        }
        return false;
    }


    /**
     * Add restricted user-agent (which will downgrade the connector
     * to HTTP/1.0 mode). The user agent String given will be matched
     * via regexp to the user-agent header submitted by the client.
     *
     * @param userAgent user-agent string
     */
    public void addRestrictedUserAgent(String userAgent) {
        try {
            Pattern nRule = Pattern.compile(userAgent);
            restrictedUserAgents = addREArray(restrictedUserAgents, nRule);
        } catch (PatternSyntaxException pse) {
            log.error(sm.getString("http11processor.regexp.error", userAgent), pse);


    /**
     * Set restricted user agent list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setRestrictedUserAgents(Pattern[] restrictedUserAgents) {
        this.restrictedUserAgents = restrictedUserAgents;
    }


    /**
     * Set restricted user agent list (which will downgrade the connector
     * to HTTP/1.0 mode). List contains users agents separated by ',' :
     *
     * ie: "gorilla,desesplorer,tigrus"
     */
    public void setRestrictedUserAgents(String restrictedUserAgents) {
        if (restrictedUserAgents != null) {
            StringTokenizer st =
                new StringTokenizer(restrictedUserAgents, ",");
            while (st.hasMoreTokens()) {
                addRestrictedUserAgent(st.nextToken().trim());
            }
        }
    }


    /**
     * Return the list of restricted user agents.
     */
    public String[] findRestrictedUserAgents() {
        String[] sarr = new String [restrictedUserAgents.length];

        for (int i = 0; i < restrictedUserAgents.length; i++)
            sarr[i] = restrictedUserAgents[i].toString();

        return (sarr);
    }


    /**
     * Set the maximum number of Keep-Alive requests to honor.
     * This is to safeguard from DoS attacks.  Setting to a negative
     * value disables the check.
     */
    public void setMaxKeepAliveRequests(int mkar) {
        maxKeepAliveRequests = mkar;
    }


    /**
     * Return the number of Keep-Alive requests that we will honor.
     */
    public int getMaxKeepAliveRequests() {
        return maxKeepAliveRequests;
    }


    /**
     * Set the maximum size of a POST which will be buffered in SSL mode.
     */
    public void setMaxSavePostSize(int msps) {
        maxSavePostSize = msps;
    }


    /**
     * Return the maximum size of a POST which will be buffered in SSL mode.
     */
    public int getMaxSavePostSize() {
        return maxSavePostSize;
    }


    /**
     * Set the flag to control upload time-outs.
     */
    public void setDisableUploadTimeout(boolean isDisabled) {
        disableUploadTimeout = isDisabled;
    }

    /**
     * Get the flag that controls upload time-outs.
     */
    public boolean getDisableUploadTimeout() {
        return disableUploadTimeout;
    }

    /**
     * Set the socket buffer flag.
     */
    public void setSocketBuffer(int socketBuffer) {
        this.socketBuffer = socketBuffer;
        outputBuffer.setSocketBuffer(socketBuffer);
    }

    /**
     * Get the socket buffer flag.
     */
    public int getSocketBuffer() {
        return socketBuffer;
    }

    /**
     * Set the upload timeout.
     */
    public void setTimeout( int timeouts ) {
        timeout = timeouts ;
    }

     * Get the upload timeout.
     */
    public int getTimeout() {
        return timeout;
    }


    /**
     * Set the server header name.
     */
    public void setServer( String server ) {
        if (server==null || server.equals("")) {
            this.server = null;
        } else {
            this.server = server;
        }
    }

    /**
     * Get the server header name.
     */
    public String getServer() {
        return server;
    }


    /** Get the request associated with this processor.
     *
     * @return The request
     */
    public Request getRequest() {
        return request;
    }

    /**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     *
     * @throws IOException error during an I/O operation
     */
    public SocketState event(boolean error)
        throws IOException {

        RequestInfo rp = request.getRequestProcessor();

        try {
            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
            error = !adapter.event(request, response, error);
            if (request.getAttribute("org.apache.tomcat.comet") == null) {
                comet = false;
            }
            SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
            if ( key != null ) {
                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                if ( attach!=null ) {
                    attach.setComet(comet);
                    Integer comettimeout = (Integer)request.getAttribute("org.apache.tomcat.comet.timeout");
                    if ( comettimeout != null ) attach.setTimeout(comettimeout.longValue());
                }
            }
            
        } catch (InterruptedIOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.request.process"), t);
            // 500 - Internal Server Error
            response.setStatus(500);
            error = true;
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (error) {
            recycle();
            return SocketState.CLOSED;
        } else if (!comet) {
            recycle();
            endpoint.getPoller().add(socket);
            return SocketState.OPEN;
        } else {
            endpoint.getCometPoller().add(socket);
            return SocketState.LONG;
        }
    }

    /**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     *
     * @throws IOException error during an I/O operation
     */
    public SocketState process(SocketChannel socket)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Set the remote address
        remoteAddr = null;
        remoteHost = null;
        localAddr = null;
        localName = null;
        remotePort = -1;
        localPort = -1;

        // Setting up the socket
        this.socket = socket;
        inputBuffer.setSocket(socket);
        outputBuffer.setSocket(socket);
        outputBuffer.setSelector(endpoint.getPoller().getSelector());

        // Error flag
        error = false;
        keepAlive = true;

        int keepAliveLeft = maxKeepAliveRequests;
        long soTimeout = endpoint.getSoTimeout();

        int limit = 0;
        if (endpoint.getFirstReadTimeout() > 0 || endpoint.getFirstReadTimeout() < -1) {
            limit = endpoint.getMaxThreads() / 2;
        }

        boolean keptAlive = false;
        boolean openSocket = false;

        while (!error && keepAlive && !comet) {

            // Parsing the request header
            try {
                if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                    socket.socket().setSoTimeout((int)soTimeout);
                    inputBuffer.readTimeout = soTimeout;
                }
                if (!inputBuffer.parseRequestLine
                        (keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) {
                    // This means that no data is available right now
                    // (long keepalive), so that the processor should be recycled
                    // and the method should return true
                    openSocket = true;
                    // Add the socket to the poller
                    endpoint.getPoller().add(socket);
                    break;
                }
                request.setStartTime(System.currentTimeMillis());
                keptAlive = true;
                if (!disableUploadTimeout) {
                    socket.socket().setSoTimeout((int)timeout);
                    inputBuffer.readTimeout = soTimeout;
                }
                inputBuffer.parseHeaders();
            recycle();
            } catch (IOException e) {
                error = true;
                break;
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), t);
                }
                // 400 - Bad Request
                response.setStatus(400);
                error = true;
            }

            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.request.prepare"), t);
                }
                // 400 - Internal Server Error
                response.setStatus(400);
                error = true;
            }

            if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0)
                keepAlive = false;

            // Process the request in the adapter
            if (!error) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                statusDropsConnection(response.getStatus());
                    }
                    // Comet support
                    if (request.getAttribute("org.apache.tomcat.comet") != null) {
                        comet = true;
                    }
                    SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
                    if (key != null) {
                        NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                        if (attach != null)  {
                            attach.setComet(comet);
                            Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
                            if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
                        }
                    }
                } catch (InterruptedIOException e) {
                    error = true;
                } catch (Throwable t) {
                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    error = true;
                }
            }

            // Finish the handling of the request
            if (!comet) {
                endRequest();
            }

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (error) {
                response.setStatus(500);
            }
            request.updateCounters();

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (comet) {
            if (error) {
                recycle();
                return SocketState.CLOSED;
            } else {
                return SocketState.LONG;
            }
        } else {
            return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
        }

    }


    public void endRequest() {

        // Finish the handling of the request
        try {
            inputBuffer.endRequest();
        } catch (IOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.request.finish"), t);
            // 500 - Internal Server Error
            response.setStatus(500);
            error = true;
        }
        try {
            outputBuffer.endRequest();
        } catch (IOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.response.finish"), t);
            error = true;
        }

        // Next request
        inputBuffer.nextRequest();
        outputBuffer.nextRequest();

    }


    public void recycle() {
        inputBuffer.recycle();
        outputBuffer.recycle();
        this.socket = null;
        this.cometClose = false;
        this.comet = false;
    }


    // ----------------------------------------------------- ActionHook Methods


    /**
     * Send an action to the connector.
     *
     * @param actionCode Type of the action
     * @param param Action parameter
     */
    public void action(ActionCode actionCode, Object param) {

        if (actionCode == ActionCode.ACTION_COMMIT) {
            // Commit current response

            if (response.isCommitted())
                return;

            // Validate and write response headers
            prepareResponse();
            try {
                outputBuffer.commit();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_ACK) {

            // Acknowlege request

            // Send a 100 status back if it makes sense (response not committed
            // yet, and client specified an expectation for 100-continue)

            if ((response.isCommitted()) || !expectation)
                return;

            inputBuffer.setSwallowInput(true);
            try {
                outputBuffer.sendAck();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {

            try {
                outputBuffer.flush();
            } catch (IOException e) {
                // Set error flag
                error = true;
                response.setErrorException(e);
            }

        } else if (actionCode == ActionCode.ACTION_CLOSE) {
            // Close

            // End the processing of the current request, and stop any further
            // transactions with the client

            comet = false;
            cometClose = true;
            SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
            if ( key != null ) {
                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                if ( attach!=null && attach.getComet()) {
                    //if this is a comet connection
                    //then execute the connection closure at the next selector loop
                    request.getAttributes().remove("org.apache.tomcat.comet.timeout");
                    attach.setError(true);
                }
            }

            try {
                outputBuffer.endRequest();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_RESET) {

            // Reset response

            // Note: This must be called before the response is committed

            outputBuffer.reset();

        } else if (actionCode == ActionCode.ACTION_CUSTOM) {

            // Do nothing

        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {

            // Get remote host address
            if ((remoteAddr == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getInetAddress();
                if (inetAddr != null) {
                    remoteAddr = inetAddr.getHostAddress();
                }
            }
            request.remoteAddr().setString(remoteAddr);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) {

            // Get local host name
            if ((localName == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getLocalAddress();
                if (inetAddr != null) {
                    localName = inetAddr.getHostName();
                }
            }
            request.localName().setString(localName);

        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) {

            // Get remote host name
            if ((remoteHost == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getInetAddress();
                if (inetAddr != null) {
                    remoteHost = inetAddr.getHostName();
                }
                if(remoteHost == null) {
                    if(remoteAddr != null) {
                        remoteHost = remoteAddr;
                    } else { // all we can do is punt
                        request.remoteHost().recycle();
                    }
                }
            }
            request.remoteHost().setString(remoteHost);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) {

            if (localAddr == null)
               localAddr = socket.socket().getLocalAddress().getHostAddress();

            request.localAddr().setString(localAddr);

        } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) {

            if ((remotePort == -1 ) && (socket !=null)) {
                remotePort = socket.socket().getPort();
            }
            request.setRemotePort(remotePort);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) {

            if ((localPort == -1 ) && (socket !=null)) {
                localPort = socket.socket().getLocalPort();
            }
            request.setLocalPort(localPort);

        } else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {

//            if (ssl && (socket != 0)) {
//                try {
//                    // Cipher suite
//                    Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER);
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.CIPHER_SUITE_KEY, sslO);
//                    }
//                    // Client certificate chain if present
//                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
//                    X509Certificate[] certs = null;
//                    if (certLength > 0) {
//                        certs = new X509Certificate[certLength];
//                        for (int i = 0; i < certLength; i++) {
//                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
//                            CertificateFactory cf =
//                                CertificateFactory.getInstance("X.509");
//                            ByteArrayInputStream stream = new ByteArrayInputStream(data);
//                            certs[i] = (X509Certificate) cf.generateCertificate(stream);
//                        }
//                    }
//                    if (certs != null) {
//                        request.setAttribute
//                            (NioEndpoint.CERTIFICATE_KEY, certs);
//                    }
//                    // User key size
//                    sslO = new Integer(SSLSocket.getInfoI(socket, SSL.SSL_INFO_CIPHER_USEKEYSIZE));
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.KEY_SIZE_KEY, sslO);
//                    }
//                    // SSL session ID
//                    sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID);
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.SESSION_ID_KEY, sslO);
//                    }
//                } catch (Exception e) {
//                    log.warn(sm.getString("http11processor.socket.ssl"), e);
//                }
//            }

        } else if (actionCode == ActionCode.ACTION_REQ_SSL_CERTIFICATE) {

//            if (ssl && (socket != 0)) {
//                 // Consume and buffer the request body, so that it does not
            internalBuffer.addActiveFilter(savedBody);
//                 // interfere with the client's handshake messages
//                InputFilter[] inputFilters = inputBuffer.getFilters();
//                ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER])
//                    .setLimit(maxSavePostSize);
//                inputBuffer.addActiveFilter
//                    (inputFilters[Constants.BUFFERED_FILTER]);
//                try {
//                    // Renegociate certificates
//                    SSLSocket.renegotiate(socket);
//                    // Client certificate chain if present
//                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
//                    X509Certificate[] certs = null;
//                    if (certLength > 0) {
//                        certs = new X509Certificate[certLength];
//                        for (int i = 0; i < certLength; i++) {
//                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
//                            CertificateFactory cf =
//                                CertificateFactory.getInstance("X.509");
//                            ByteArrayInputStream stream = new ByteArrayInputStream(data);
//                            certs[i] = (X509Certificate) cf.generateCertificate(stream);
//                        }
//                    }
//                    if (certs != null) {
//                        request.setAttribute
//                            (NioEndpoint.CERTIFICATE_KEY, certs);
//                    }
//                } catch (Exception e) {
//                    log.warn(sm.getString("http11processor.socket.ssl"), e);
//                }
//            }

        } else if (actionCode == ActionCode.ACTION_REQ_SET_BODY_REPLAY) {
            ByteChunk body = (ByteChunk) param;

            InputFilter savedBody = new SavedRequestInputFilter(body);
            savedBody.setRequest(request);

            InternalNioInputBuffer internalBuffer = (InternalNioInputBuffer)
                request.getInputBuffer();
        }

    }


    // ------------------------------------------------------ Connector Methods


    /**
     * Set the associated adapter.
     *
     * @param adapter the new adapter
     */
    public void setAdapter(Adapter adapter) {
        this.adapter = adapter;
    }


    /**
     * Get the associated adapter.
     *
     * @return the associated adapter
     */
    public Adapter getAdapter() {
        return adapter;
    }


    // ------------------------------------------------------ Protected Methods


    /**
     * After reading the request headers, we have to setup the request filters.
     */
    protected void prepareRequest() {

        http11 = true;
        http09 = false;
        contentDelimitation = false;
        expectation = false;
        if (ssl) {
            request.scheme().setString("https");
        }
        MessageBytes protocolMB = request.protocol();
        if (protocolMB.equals(Constants.HTTP_11)) {
            http11 = true;
            protocolMB.setString(Constants.HTTP_11);
        } else if (protocolMB.equals(Constants.HTTP_10)) {
            http11 = false;
            keepAlive = false;
            protocolMB.setString(Constants.HTTP_10);
        } else if (protocolMB.equals("")) {
            // HTTP/0.9
            http09 = true;
            http11 = false;
            keepAlive = false;
        } else {

            // Unsupported protocol
            http11 = false;
            error = true;
            // Send 505; Unsupported HTTP version
            response.setStatus(505);
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals(Constants.GET)) {
            methodMB.setString(Constants.GET);
        } else if (methodMB.equals(Constants.POST)) {
            methodMB.setString(Constants.POST);
        }

        MimeHeaders headers = request.getMimeHeaders();

        // Check connection header
        MessageBytes connectionValueMB = headers.getValue("connection");
        if (connectionValueMB != null) {
            ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
            if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
                keepAlive = false;
            } else if (findBytes(connectionValueBC,
                                 Constants.KEEPALIVE_BYTES) != -1) {
                keepAlive = true;
            }
        }

        MessageBytes expectMB = null;
        if (http11)
            expectMB = headers.getValue("expect");
        if ((expectMB != null)
            && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) {
            inputBuffer.setSwallowInput(false);
            expectation = true;
        }

        // Check user-agent header
        if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) {
            MessageBytes userAgentValueMB = headers.getValue("user-agent");
            // Check in the restricted list, and adjust the http11
            // and keepAlive flags accordingly
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();
                for (int i = 0; i < restrictedUserAgents.length; i++) {
        // Check if content is not allready gzipped
                    if (restrictedUserAgents[i].matcher(userAgentValue).matches()) {
                        http11 = false;
                        keepAlive = false;
                        break;
                    }
                }
            }
        }

        // Check for a full URI (including protocol://host:port/)
        ByteChunk uriBC = request.requestURI().getByteChunk();
        if (uriBC.startsWithIgnoreCase("http", 0)) {

            int pos = uriBC.indexOf("://", 0, 3, 4);
            int uriBCStart = uriBC.getStart();
            int slashPos = -1;
            if (pos != -1) {
                byte[] uriB = uriBC.getBytes();
                slashPos = uriBC.indexOf('/', pos + 3);
                if (slashPos == -1) {
                    slashPos = uriBC.getLength();
                    // Set URI as "/"
                    request.requestURI().setBytes
                        (uriB, uriBCStart + pos + 1, 1);
                } else {
                    request.requestURI().setBytes
                        (uriB, uriBCStart + slashPos,
                         uriBC.getLength() - slashPos);
                }
                MessageBytes hostMB = headers.setValue("host");
                hostMB.setBytes(uriB, uriBCStart + pos + 3,
                                slashPos - pos - 3);
            }

        }

        // Input filter setup
        InputFilter[] inputFilters = inputBuffer.getFilters();

        // Parse transfer-encoding header
        MessageBytes transferEncodingValueMB = null;
        if (http11)
            transferEncodingValueMB = headers.getValue("transfer-encoding");
        if (transferEncodingValueMB != null) {
            String transferEncodingValue = transferEncodingValueMB.toString();
            // Parse the comma separated list. "identity" codings are ignored
            int startPos = 0;
                // 80 - Default HTTP port
            int commaPos = transferEncodingValue.indexOf(',');
            String encodingName = null;
            while (commaPos != -1) {
                encodingName = transferEncodingValue.substring
                    (startPos, commaPos).toLowerCase().trim();
                if (!addInputFilter(inputFilters, encodingName)) {
                    // Unsupported transfer encoding
                    error = true;
                    // 501 - Unimplemented
                    response.setStatus(501);
                }
                startPos = commaPos + 1;
                commaPos = transferEncodingValue.indexOf(',', startPos);
            }
            encodingName = transferEncodingValue.substring(startPos)
                .toLowerCase().trim();
            if (!addInputFilter(inputFilters, encodingName)) {
                // Unsupported transfer encoding
                error = true;
                // 501 - Unimplemented
                response.setStatus(501);
            }
        }

        // Parse content-length header
        long contentLength = request.getContentLengthLong();
        if (contentLength >= 0 && !contentDelimitation) {
            inputBuffer.addActiveFilter
                (inputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        }

        MessageBytes valueMB = headers.getValue("host");

        // Check host header
        if (http11 && (valueMB == null)) {
            error = true;
            // 400 - Bad request
            response.setStatus(400);
        }

        parseHost(valueMB);

        if (!contentDelimitation) {
            // If there's no content length 
            // (broken HTTP/1.0 or HTTP/1.1), assume

            // the client is not broken and didn't send a body
            inputBuffer.addActiveFilter
                    (inputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }

        // Advertise comet support through a request attribute
        request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
        // Advertise comet timeout support
        request.setAttribute("org.apache.tomcat.comet.timeout.support", Boolean.TRUE);

    }


    /**
     * Parse host.
     */
    public void parseHost(MessageBytes valueMB) {

        if (valueMB == null || valueMB.isNull()) {
            // HTTP/1.0
            // Default is what the socket tells us. Overriden if a host is
            // found/parsed
            request.setServerPort(endpoint.getPort());
            return;
        }

        ByteChunk valueBC = valueMB.getByteChunk();
        byte[] valueB = valueBC.getBytes();
        int valueL = valueBC.getLength();
        int valueS = valueBC.getStart();
        int colonPos = -1;
        if (hostNameC.length < valueL) {
            hostNameC = new char[valueL];
        }

        boolean ipv6 = (valueB[valueS] == '[');
        boolean bracketClosed = false;
        for (int i = 0; i < valueL; i++) {
            char b = (char) valueB[i + valueS];
            hostNameC[i] = b;
            if (b == ']') {
                bracketClosed = true;
            } else if (b == ':') {
                if (!ipv6 || bracketClosed) {
                    colonPos = i;
                    break;
                }
            }
        }

        if (colonPos < 0) {
            if (!ssl) {
        MessageBytes contentEncodingMB =
            } else {
                // 443 - Default HTTPS port
                request.setServerPort(443);
            }
            request.serverName().setChars(hostNameC, 0, valueL);
        } else {

            request.serverName().setChars(hostNameC, 0, colonPos);

            int port = 0;
            int mult = 1;
            for (int i = valueL - 1; i > colonPos; i--) {
                int charValue = HexUtils.DEC[(int) valueB[i + valueS]];
                if (charValue == -1) {
                    // Invalid character
                    error = true;
                    // 400 - Bad request
                    response.setStatus(400);
                    break;
                }
                port = port + (charValue * mult);
                mult = 10 * mult;
            }
            request.setServerPort(port);

        }

    }


    /**
     * Check for compression
     */
    private boolean isCompressable() {

        // Nope Compression could works in HTTP 1.0 also
        // cf: mod_deflate

        // Compression only since HTTP 1.1
        // if (! http11)
        //    return false;

        // Check if browser support gzip encoding
        MessageBytes acceptEncodingMB =
            request.getMimeHeaders().getValue("accept-encoding");

        if ((acceptEncodingMB == null)
            || (acceptEncodingMB.indexOf("gzip") == -1))
            return false;
            response.getMimeHeaders().getValue("Content-Encoding");

        if ((contentEncodingMB != null)
            && (contentEncodingMB.indexOf("gzip") != -1))
            return false;

        // If force mode, allways compress (test purposes only)
        if (compressionLevel == 2)
           return true;

        // Check for incompatible Browser
        if (noCompressionUserAgents != null) {
            MessageBytes userAgentValueMB =
                request.getMimeHeaders().getValue("user-agent");
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();

                // If one Regexp rule match, disable compression
                for (int i = 0; i < noCompressionUserAgents.length; i++)
                    if (noCompressionUserAgents[i].matcher(userAgentValue).matches())
                        return false;
            }
        }

        // Check if suffisant len to trig the compression
        long contentLength = response.getContentLengthLong();
        if ((contentLength == -1)
            || (contentLength > compressionMinSize)) {
            // Check for compatible MIME-TYPE
            if (compressableMimeTypes != null) {
                return (startsWithStringArray(compressableMimeTypes,
                                              response.getContentType()));
            }
        }

        return false;
    }


    /**
     * When committing the response, we have to validate the set of headers, as
     * well as setup the response filters.
     */
    protected void prepareResponse() {

        boolean entityBody = true;
        contentDelimitation = false;

        OutputFilter[] outputFilters = outputBuffer.getFilters();
        long contentLength = response.getContentLengthLong();

        if (http09 == true) {
            // HTTP/0.9
        outputBuffer.sendStatus();

            outputBuffer.addActiveFilter
                (outputFilters[Constants.IDENTITY_FILTER]);
            return;
        }

        int statusCode = response.getStatus();
        if ((statusCode == 204) || (statusCode == 205)
            || (statusCode == 304)) {
            // No entity body
            outputBuffer.addActiveFilter
                (outputFilters[Constants.VOID_FILTER]);
            entityBody = false;
            contentDelimitation = true;
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals("HEAD")) {
            // No entity body
            outputBuffer.addActiveFilter
                (outputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }


        // Check for compression
        boolean useCompression = false;
        if (entityBody && (compressionLevel > 0)) {
            useCompression = isCompressable();
            // Change content-length to -1 to force chunking
            if (useCompression) {
                response.setContentLength(-1);
            }
        }

        MimeHeaders headers = response.getMimeHeaders();
        if (!entityBody) {
            response.setContentLength(-1);
        } else {
            String contentType = response.getContentType();
            if (contentType != null) {
                headers.setValue("Content-Type").setString(contentType);
            }
            String contentLanguage = response.getContentLanguage();
            if (contentLanguage != null) {
                headers.setValue("Content-Language")
                    .setString(contentLanguage);
            }
        }
        if (contentLength != -1) {
            headers.setValue("Content-Length").setLong(contentLength);
            outputBuffer.addActiveFilter
                (outputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        } else {
            if (entityBody && http11 && keepAlive) {
                outputBuffer.addActiveFilter
                    (outputFilters[Constants.CHUNKED_FILTER]);
                contentDelimitation = true;
                headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
            } else {
                outputBuffer.addActiveFilter
                    (outputFilters[Constants.IDENTITY_FILTER]);
            }
        }

        if (useCompression) {
            outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
            headers.setValue("Content-Encoding").setString("gzip");
            // Make Proxies happy via Vary (from mod_deflate)
            headers.setValue("Vary").setString("Accept-Encoding");
        }

        // Add date header
        headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate());

        // FIXME: Add transfer encoding header

        if ((entityBody) && (!contentDelimitation)) {
            // Mark as close the connection after the request, and add the
            // connection: close header
            keepAlive = false;
        }

        // If we know that the request is bad this early, add the
        // Connection: close header.
        keepAlive = keepAlive && !statusDropsConnection(statusCode);
        if (!keepAlive) {
            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
        } else if (!http11 && !error) {
            headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
        }

        // Build the response header
        // Add server header
        if (server != null) {
            headers.setValue("Server").setString(server);
        } else {
            outputBuffer.write(Constants.SERVER_BYTES);
        }

        int size = headers.size();
        for (int i = 0; i < size; i++) {
            outputBuffer.sendHeader(headers.getName(i), headers.getValue(i));
        }
        outputBuffer.endHeaders();

    }


    /**
     * Initialize standard input and output filters.
     */
    protected void initializeFilters() {

        // Create and add the identity filters.
        inputBuffer.addFilter(new IdentityInputFilter());
        outputBuffer.addFilter(new IdentityOutputFilter());

        // Create and add the chunked filters.
        inputBuffer.addFilter(new ChunkedInputFilter());
        outputBuffer.addFilter(new ChunkedOutputFilter());

        // Create and add the void filters.
        inputBuffer.addFilter(new VoidInputFilter());
        outputBuffer.addFilter(new VoidOutputFilter());

        // Create and add buffered input filter
        inputBuffer.addFilter(new BufferedInputFilter());

        // Create and add the chunked filters.
        //inputBuffer.addFilter(new GzipInputFilter());
        outputBuffer.addFilter(new GzipOutputFilter());

    }


    /**
     * Add an input filter to the current request.
     *
     * @return false if the encoding was not found (which would mean it is
     * unsupported)
     */
    protected boolean addInputFilter(InputFilter[] inputFilters,
                                     String encodingName) {
        if (encodingName.equals("identity")) {
            // Skip
        } else if (encodingName.equals("chunked")) {
            inputBuffer.addActiveFilter
                (inputFilters[Constants.CHUNKED_FILTER]);
            contentDelimitation = true;
        } else {
            for (int i = 2; i < inputFilters.length; i++) {
                if (inputFilters[i].getEncodingName()
                    .toString().equals(encodingName)) {
                    inputBuffer.addActiveFilter(inputFilters[i]);
                    return true;
                }
            }
            return false;
        }
        return true;
    }


    /**
     * Specialized utility method: find a sequence of lower case bytes inside
     * a ByteChunk.
     */
    protected int findBytes(ByteChunk bc, byte[] b) {

        byte first = b[0];
        byte[] buff = bc.getBuffer();
        int start = bc.getStart();
        int end = bc.getEnd();

    // Look for first char
    int srcEnd = b.length;

    for (int i = start; i <= (end - srcEnd); i++) {
        if (Ascii.toLower(buff[i]) != first) continue;
        // found first char, now look for a match
            int myPos = i+1;
        for (int srcPos = 1; srcPos < srcEnd; ) {
                if (Ascii.toLower(buff[myPos++]) != b[srcPos++])
            break;
                if (srcPos == srcEnd) return i - start; // found it
        }
    }
    return -1;

    }

    /**
     * Determine if we must drop the connection because of the HTTP status
     * code.  Use the same list of codes as Apache/httpd.
     */
    protected boolean statusDropsConnection(int status) {
        return status == 400 /* SC_BAD_REQUEST */ ||
               status == 408 /* SC_REQUEST_TIMEOUT */ ||
               status == 411 /* SC_LENGTH_REQUIRED */ ||
               status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
               status == 414 /* SC_REQUEST_URI_TOO_LARGE */ ||
               status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
               status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
               status == 501 /* SC_NOT_IMPLEMENTED */;
    }

}
>>>>>>> c705e291c22dbbe2c8e25822fd1911a4481ef618
Solution content
        else {
//                try {
/*
 *  Copyright 1999-2004 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.coyote.http11;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.coyote.Adapter;
import org.apache.coyote.Request;
import org.apache.coyote.RequestInfo;
import org.apache.coyote.Response;
import org.apache.coyote.http11.filters.BufferedInputFilter;
import org.apache.coyote.http11.filters.ChunkedInputFilter;
import org.apache.coyote.http11.filters.ChunkedOutputFilter;
import org.apache.coyote.http11.filters.GzipOutputFilter;
import org.apache.coyote.http11.filters.IdentityInputFilter;
import org.apache.coyote.http11.filters.IdentityOutputFilter;
import org.apache.coyote.http11.filters.SavedRequestInputFilter;
import org.apache.coyote.http11.filters.VoidInputFilter;
import org.apache.coyote.http11.filters.VoidOutputFilter;
import org.apache.tomcat.util.buf.Ascii;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioEndpoint.Handler;
import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
import java.nio.channels.SelectionKey;


/**
 * Processes HTTP requests.
 *
 * @author Remy Maucherat
 * @author Filip Hanik
 */
public class Http11NioProcessor implements ActionHook {


    /**
     * Logger.
     */
        localPort = -1;


            }
            }

        }
    /**
    protected static org.apache.commons.logging.Log log
        = org.apache.commons.logging.LogFactory.getLog(Http11NioProcessor.class);

    /**
     * The string manager for this package.
     */
    protected static StringManager sm =
        StringManager.getManager(Constants.Package);


    // ----------------------------------------------------------- Constructors


    public Http11NioProcessor(int headerBufferSize, NioEndpoint endpoint) {

        this.endpoint = endpoint;

        request = new Request();
        int readTimeout = endpoint.getFirstReadTimeout();
        if (readTimeout == 0) {
            readTimeout = 100;
        } else if (readTimeout < 0) {
            readTimeout = timeout;
            //readTimeout = -1;
        }
        inputBuffer = new InternalNioInputBuffer(request, headerBufferSize,readTimeout);
        inputBuffer.setPoller(endpoint.getPoller());
        request.setInputBuffer(inputBuffer);

        response = new Response();
        response.setHook(this);
        outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
        response.setOutputBuffer(outputBuffer);
        request.setResponse(response);

        ssl = !"off".equalsIgnoreCase(endpoint.getSSLEngine());

        initializeFilters();

        // Cause loading of HexUtils
        int foo = HexUtils.DEC[0];

        // Cause loading of FastHttpDateFormat
        FastHttpDateFormat.getCurrentDate();

    }


    // ----------------------------------------------------- Instance Variables


    /**
     * Associated adapter.
     */
    protected Adapter adapter = null;


    /**
     * Request object.
     */
    protected Request request = null;


    /**
     * Response object.
     */
    protected Response response = null;


    /**
     * Input.
     */
    protected InternalNioInputBuffer inputBuffer = null;


    /**
     * Output.
     */
    protected InternalNioOutputBuffer outputBuffer = null;


    /**
     * Error flag.
     */
    protected boolean error = false;


    /**
     * Keep-alive.
     */
    protected boolean keepAlive = true;


    /**
     * HTTP/1.1 flag.
     */
    protected boolean http11 = true;


    /**
     * HTTP/0.9 flag.
     */
    protected boolean http09 = false;



    /**
     * Comet used.
     */
    protected boolean comet = false;
    
    /**
     * Closed flag, a Comet async thread can 
     * signal for this Nio processor to be closed and recycled instead
     * of waiting for a timeout.
     * Closed by HttpServletResponse.getWriter().close()
     */
    }
        }
    }

    /**
    protected boolean cometClose = false;

    /**
     * Content delimitator for the request (if false, the connection will
     * be closed at the end of the request).
     */
    protected boolean contentDelimitation = true;


    /**
     * Is there an expectation ?
     */
    protected boolean expectation = false;


    /**
     * List of restricted user agents.
     */
    protected Pattern[] restrictedUserAgents = null;


    /**
     * Maximum number of Keep-Alive requests to honor.
     */
    protected int maxKeepAliveRequests = -1;


    /**
     * SSL enabled ?
     */
    protected boolean ssl = false;


    /**
     * Socket associated with the current connection.
     */
    protected SocketChannel socket = null;


    /**
     * Remote Address associated with the current connection.
     */
    protected String remoteAddr = null;


    /**
     * Remote Host associated with the current connection.
     */
    protected String remoteHost = null;


    /**
     * Local Host associated with the current connection.
     */
    protected String localName = null;



    /**
     * Local port to which the socket is connected
     */
    protected int localPort = -1;


    /**
     * Remote port to which the socket is connected
     */
    protected int remotePort = -1;


    /**
     * The local Host address.
     */
    protected String localAddr = null;


    /**
     * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
     */
    protected int timeout = 300000;


    /**
     * Flag to disable setting a different time-out on uploads.
     */
    protected boolean disableUploadTimeout = false;


    /**
     * Allowed compression level.
     */
    protected int compressionLevel = 0;


    /**
     * Minimum contentsize to make compression.
     */
    protected int compressionMinSize = 2048;


    /**
     * Socket buffering.
     */
    protected int socketBuffer = -1;


    /**
     * Max save post size.
     */
    protected int maxSavePostSize = 4 * 1024;


    /**
     * List of user agents to not use gzip with
     */
    protected Pattern noCompressionUserAgents[] = null;

    /**
     * List of MIMES which could be gzipped
     */
    protected String[] compressableMimeTypes =
    { "text/html", "text/xml", "text/plain" };


    /**
     * Host name (used to avoid useless B2C conversion on the host name).
     */
    protected char[] hostNameC = new char[0];


    /**
     * Associated endpoint.
     */
    protected NioEndpoint endpoint;


    /**
     * Allow a customized the server header for the tin-foil hat folks.
     */
    protected String server = null;


    // ------------------------------------------------------------- Properties


    /**
     * Return compression level.
     */
    public String getCompression() {
        switch (compressionLevel) {
        case 0:
            return "off";
        case 1:
            return "on";
        case 2:
            return "force";
        }
        return "off";
    }


    /**
     * Set compression level.
     */
    public void setCompression(String compression) {
        if (compression.equals("on")) {
            this.compressionLevel = 1;
        } else if (compression.equals("force")) {
            this.compressionLevel = 2;
        } else if (compression.equals("off")) {
            this.compressionLevel = 0;
        } else {
            try {
                // Try to parse compression as an int, which would give the
                // minimum compression size
                compressionMinSize = Integer.parseInt(compression);
                this.compressionLevel = 1;
            } catch (Exception e) {
                this.compressionLevel = 0;
     * Set Minimum size to trigger compression.
     */
    public void setCompressionMinSize(int compressionMinSize) {
        this.compressionMinSize = compressionMinSize;
    }


    /**
     * Add user-agent for which gzip compression didn't works
     * The user agent String given will be exactly matched
     * to the user-agent header submitted by the client.
     *
     * @param userAgent user-agent string
     */
    public void addNoCompressionUserAgent(String userAgent) {
        try {
            Pattern nRule = Pattern.compile(userAgent);
            noCompressionUserAgents =
                addREArray(noCompressionUserAgents, nRule);
        } catch (PatternSyntaxException pse) {
            log.error(sm.getString("http11processor.regexp.error", userAgent), pse);
        }
    }


    /**
     * Set no compression user agent list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setNoCompressionUserAgents(Pattern[] noCompressionUserAgents) {
        this.noCompressionUserAgents = noCompressionUserAgents;
    }


    /**
     * Set no compression user agent list.
     * List contains users agents separated by ',' :
     *
     * ie: "gorilla,desesplorer,tigrus"
     */
    public void setNoCompressionUserAgents(String noCompressionUserAgents) {
        if (noCompressionUserAgents != null) {
            StringTokenizer st = new StringTokenizer(noCompressionUserAgents, ",");

            while (st.hasMoreTokens()) {
                addNoCompressionUserAgent(st.nextToken().trim());
            }
     * Add a mime-type which will be compressable
     * The mime-type String will be exactly matched
     * in the response mime-type header .
     *
     * @param mimeType mime-type string
     */
    public void addCompressableMimeType(String mimeType) {
        compressableMimeTypes =
            addStringArray(compressableMimeTypes, mimeType);
    }


    /**
     * Set compressable mime-type list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setCompressableMimeTypes(String[] compressableMimeTypes) {
        this.compressableMimeTypes = compressableMimeTypes;
    }


    /**
     * Set compressable mime-type list
     * List contains users agents separated by ',' :
     *
     * ie: "text/html,text/xml,text/plain"
     */
    public void setCompressableMimeTypes(String compressableMimeTypes) {
        if (compressableMimeTypes != null) {
            StringTokenizer st = new StringTokenizer(compressableMimeTypes, ",");

            while (st.hasMoreTokens()) {
                addCompressableMimeType(st.nextToken().trim());
            }
        }
    }


    /**
     * Return the list of restricted user agents.
     */
    public String[] findCompressableMimeTypes() {
        return (compressableMimeTypes);
    }



    // --------------------------------------------------------- Public Methods


    /**
     * Add input or output filter.
     *
     * @param className class name of the filter
     */
    protected void addFilter(String className) {
        try {
            Class clazz = Class.forName(className);
            Object obj = clazz.newInstance();
            if (obj instanceof InputFilter) {
                inputBuffer.addFilter((InputFilter) obj);
            } else if (obj instanceof OutputFilter) {
                outputBuffer.addFilter((OutputFilter) obj);
            } else {
                log.warn(sm.getString("http11processor.filter.unknown", className));
            }
        } catch (Exception e) {
            log.error(sm.getString("http11processor.filter.error", className), e);
        }
    }


    /**
     * General use method
     *
     * @param sArray the StringArray
     * @param value string
     */
    private String[] addStringArray(String sArray[], String value) {
        String[] result = null;
        if (sArray == null) {
            result = new String[1];
            result[0] = value;
        }
        else {
            result = new String[sArray.length + 1];
            for (int i = 0; i < sArray.length; i++)
                result[i] = sArray[i];
            result[sArray.length] = value;
        }
        return result;
    }


    /**
     * General use method
     *
     * @param rArray the REArray
     * @param value Obj
     */
    private Pattern[] addREArray(Pattern rArray[], Pattern value) {
        Pattern[] result = null;
        if (rArray == null) {
            result = new Pattern[1];
            result[0] = value;
        }
            result = new Pattern[rArray.length + 1];
            for (int i = 0; i < rArray.length; i++)
                result[i] = rArray[i];
            result[rArray.length] = value;
        }
        return result;
    }


    /**
     * General use method
     *
     * @param sArray the StringArray
     * @param value string
     */
    private boolean inStringArray(String sArray[], String value) {
        for (int i = 0; i < sArray.length; i++) {
            if (sArray[i].equals(value)) {
                return true;
            }
        }
        return false;
    }


    /**
     * Checks if any entry in the string array starts with the specified value
     *
     * @param sArray the StringArray
     * @param value string
     */
    private boolean startsWithStringArray(String sArray[], String value) {
        if (value == null)
           return false;
        for (int i = 0; i < sArray.length; i++) {
            if (value.startsWith(sArray[i])) {
                return true;
            }
        }
        return false;
    }


    /**
     * Add restricted user-agent (which will downgrade the connector
     * to HTTP/1.0 mode). The user agent String given will be matched
     * via regexp to the user-agent header submitted by the client.
     *
     * @param userAgent user-agent string
     */
    public void addRestrictedUserAgent(String userAgent) {
        try {
            Pattern nRule = Pattern.compile(userAgent);
            restrictedUserAgents = addREArray(restrictedUserAgents, nRule);
        } catch (PatternSyntaxException pse) {
            log.error(sm.getString("http11processor.regexp.error", userAgent), pse);
        }
    }


    /**
     * Set restricted user agent list (this method is best when used with
     * a large number of connectors, where it would be better to have all of
     * them referenced a single array).
     */
    public void setRestrictedUserAgents(Pattern[] restrictedUserAgents) {
        this.restrictedUserAgents = restrictedUserAgents;
    }


    /**
     * Set restricted user agent list (which will downgrade the connector
     * to HTTP/1.0 mode). List contains users agents separated by ',' :
     *
     * ie: "gorilla,desesplorer,tigrus"
     */
    public void setRestrictedUserAgents(String restrictedUserAgents) {
        if (restrictedUserAgents != null) {
            StringTokenizer st =
                new StringTokenizer(restrictedUserAgents, ",");
            while (st.hasMoreTokens()) {
                addRestrictedUserAgent(st.nextToken().trim());
            }
        }
    }


    /**
     * Return the list of restricted user agents.
     */
    public String[] findRestrictedUserAgents() {
        String[] sarr = new String [restrictedUserAgents.length];

        for (int i = 0; i < restrictedUserAgents.length; i++)
            sarr[i] = restrictedUserAgents[i].toString();

        return (sarr);
    }


    /**
     * Set the maximum number of Keep-Alive requests to honor.
     * This is to safeguard from DoS attacks.  Setting to a negative
     * value disables the check.
     */
    public void setMaxKeepAliveRequests(int mkar) {
        maxKeepAliveRequests = mkar;
    }


    /**
     * Return the number of Keep-Alive requests that we will honor.
     */
    public int getMaxKeepAliveRequests() {
        return maxKeepAliveRequests;
    }


    /**
     * Set the maximum size of a POST which will be buffered in SSL mode.
     */
    public void setMaxSavePostSize(int msps) {
        maxSavePostSize = msps;
    }


    /**
     * Return the maximum size of a POST which will be buffered in SSL mode.
     */
    public int getMaxSavePostSize() {
        return maxSavePostSize;
    }


    /**
     * Set the flag to control upload time-outs.
     */
    public void setDisableUploadTimeout(boolean isDisabled) {
        disableUploadTimeout = isDisabled;
    }

    /**
     * Get the flag that controls upload time-outs.
     */
    public boolean getDisableUploadTimeout() {
        return disableUploadTimeout;
    }

    /**
     * Set the socket buffer flag.
     */
    public void setSocketBuffer(int socketBuffer) {
        this.socketBuffer = socketBuffer;
        outputBuffer.setSocketBuffer(socketBuffer);
    }

    /**
     * Get the socket buffer flag.
     */
    public int getSocketBuffer() {
        return socketBuffer;
    }

    /**
     * Set the upload timeout.
     */
    public void setTimeout( int timeouts ) {
        timeout = timeouts ;
    }

    /**
     * Get the upload timeout.
     */
    public int getTimeout() {
        return timeout;
    }


    /**
     * Set the server header name.
     */
    public void setServer( String server ) {
        if (server==null || server.equals("")) {
            this.server = null;
        } else {
            this.server = server;
        }
    }

    /**
     * Get the server header name.
     */
    public String getServer() {
        return server;
    }


    /** Get the request associated with this processor.
     *
     * @return The request
     */
    public Request getRequest() {
        return request;
    }

    /**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     *
     * @throws IOException error during an I/O operation
     */
    public SocketState event(boolean error)
        throws IOException {

        RequestInfo rp = request.getRequestProcessor();

        try {
            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
            error = !adapter.event(request, response, error);
            if (request.getAttribute("org.apache.tomcat.comet") == null) {
                comet = false;
            }
            SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
            if ( key != null ) {
                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                if ( attach!=null ) {
                    attach.setComet(comet);
                    Integer comettimeout = (Integer)request.getAttribute("org.apache.tomcat.comet.timeout");
                    if ( comettimeout != null ) attach.setTimeout(comettimeout.longValue());
                }
            }
            
        } catch (InterruptedIOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.request.process"), t);
            // 500 - Internal Server Error
            response.setStatus(500);
            error = true;
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (error) {
            recycle();
            return SocketState.CLOSED;
        } else if (!comet) {
            recycle();
            endpoint.getPoller().add(socket);
            return SocketState.OPEN;
        } else {
            endpoint.getCometPoller().add(socket);
            return SocketState.LONG;
        }
    }

    /**
     * Process pipelined HTTP requests using the specified input and output
     * streams.
     *
     * @throws IOException error during an I/O operation
     */
    public SocketState process(SocketChannel socket)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Set the remote address
        remoteAddr = null;
        remoteHost = null;
        localAddr = null;
        localName = null;
        remotePort = -1;
        // Setting up the socket
        this.socket = socket;
        inputBuffer.setSocket(socket);
        outputBuffer.setSocket(socket);
        outputBuffer.setSelector(endpoint.getPoller().getSelector());

        // Error flag
        error = false;
        keepAlive = true;

        int keepAliveLeft = maxKeepAliveRequests;
        long soTimeout = endpoint.getSoTimeout();

        int limit = 0;
        if (endpoint.getFirstReadTimeout() > 0 || endpoint.getFirstReadTimeout() < -1) {
            limit = endpoint.getMaxThreads() / 2;
        }

        boolean keptAlive = false;
        boolean openSocket = false;

        while (!error && keepAlive && !comet) {

            // Parsing the request header
            try {
                if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                    socket.socket().setSoTimeout((int)soTimeout);
                    inputBuffer.readTimeout = soTimeout;
                }
                if (!inputBuffer.parseRequestLine
                        (keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) {
                    // This means that no data is available right now
                    // (long keepalive), so that the processor should be recycled
                    // and the method should return true
                    openSocket = true;
                    // Add the socket to the poller
                    endpoint.getPoller().add(socket);
                    break;
                }
                request.setStartTime(System.currentTimeMillis());
                keptAlive = true;
                if (!disableUploadTimeout) {
                    socket.socket().setSoTimeout((int)timeout);
                    inputBuffer.readTimeout = soTimeout;
                }
                inputBuffer.parseHeaders();
            } catch (IOException e) {
                error = true;
                break;
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), t);
                }
                // 400 - Bad Request
                response.setStatus(400);
                error = true;
            }

            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.request.prepare"), t);
                }
                // 400 - Internal Server Error
                response.setStatus(400);
                error = true;
            }

            if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0)
                keepAlive = false;

            // Process the request in the adapter
            if (!error) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                statusDropsConnection(response.getStatus());
                    }
                    // Comet support
                    if (request.getAttribute("org.apache.tomcat.comet") != null) {
                        comet = true;
                    }
                    SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
                    if (key != null) {
                        NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                        if (attach != null)  {
                            attach.setComet(comet);
                            Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
                            if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
                        }
                    }
                } catch (InterruptedIOException e) {
                    error = true;
                } catch (Throwable t) {
                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    error = true;
                }
            }

            // Finish the handling of the request
            if (!comet) {
                endRequest();
            }

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (error) {
                response.setStatus(500);
            }
            request.updateCounters();

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

        if (comet) {
            if (error) {
                recycle();
                return SocketState.CLOSED;
            } else {
//                    // Cipher suite
                request.getInputBuffer();
                return SocketState.LONG;
            }
        } else {
            recycle();
            return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
        }

    }


    public void endRequest() {

        // Finish the handling of the request
        try {
            inputBuffer.endRequest();
        } catch (IOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.request.finish"), t);
            // 500 - Internal Server Error
            response.setStatus(500);
            error = true;
        }
        try {
            outputBuffer.endRequest();
        } catch (IOException e) {
            error = true;
        } catch (Throwable t) {
            log.error(sm.getString("http11processor.response.finish"), t);
            error = true;
        }

        // Next request
        inputBuffer.nextRequest();
        outputBuffer.nextRequest();

    }


    public void recycle() {
        inputBuffer.recycle();
        outputBuffer.recycle();
        this.socket = null;
        this.cometClose = false;
        this.comet = false;
    }


    // ----------------------------------------------------- ActionHook Methods


    /**
     * Send an action to the connector.
     *
     * @param actionCode Type of the action
     * @param param Action parameter
     */
    public void action(ActionCode actionCode, Object param) {

        if (actionCode == ActionCode.ACTION_COMMIT) {
            // Commit current response

            if (response.isCommitted())
                return;

            // Validate and write response headers
            prepareResponse();
            try {
                outputBuffer.commit();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_ACK) {

            // Acknowlege request

            // Send a 100 status back if it makes sense (response not committed
            // yet, and client specified an expectation for 100-continue)

            if ((response.isCommitted()) || !expectation)
                return;

            inputBuffer.setSwallowInput(true);
            try {
                outputBuffer.sendAck();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {

            try {
                outputBuffer.flush();
            } catch (IOException e) {
                // Set error flag
                error = true;
                response.setErrorException(e);
            }

        } else if (actionCode == ActionCode.ACTION_CLOSE) {
            // Close

            // End the processing of the current request, and stop any further
            // transactions with the client

            comet = false;
            cometClose = true;
            SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector());
            if ( key != null ) {
                NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
                if ( attach!=null && attach.getComet()) {
                    //if this is a comet connection
                    //then execute the connection closure at the next selector loop
                    request.getAttributes().remove("org.apache.tomcat.comet.timeout");
                    attach.setError(true);
                }
            }

            try {
                outputBuffer.endRequest();
            } catch (IOException e) {
                // Set error flag
                error = true;
            }

        } else if (actionCode == ActionCode.ACTION_RESET) {

            // Reset response

            // Note: This must be called before the response is committed

            outputBuffer.reset();

        } else if (actionCode == ActionCode.ACTION_CUSTOM) {

            // Do nothing

        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {

            // Get remote host address
            if ((remoteAddr == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getInetAddress();
                if (inetAddr != null) {
                    remoteAddr = inetAddr.getHostAddress();
                }
            }
            request.remoteAddr().setString(remoteAddr);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) {

            // Get local host name
            if ((localName == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getLocalAddress();
                if (inetAddr != null) {
                    localName = inetAddr.getHostName();
                }
            }
            request.localName().setString(localName);
        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) {

            // Get remote host name
            if ((remoteHost == null) && (socket != null)) {
                InetAddress inetAddr = socket.socket().getInetAddress();
                if (inetAddr != null) {
                    remoteHost = inetAddr.getHostName();
                }
                if(remoteHost == null) {
                    if(remoteAddr != null) {
                        remoteHost = remoteAddr;
                    } else { // all we can do is punt
                        request.remoteHost().recycle();
                    }
                }
            }
            request.remoteHost().setString(remoteHost);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) {

            if (localAddr == null)
               localAddr = socket.socket().getLocalAddress().getHostAddress();

            request.localAddr().setString(localAddr);

        } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) {

            if ((remotePort == -1 ) && (socket !=null)) {
                remotePort = socket.socket().getPort();
            }
            request.setRemotePort(remotePort);

        } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) {

            if ((localPort == -1 ) && (socket !=null)) {
                localPort = socket.socket().getLocalPort();
            }
            request.setLocalPort(localPort);

        } else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {

//            if (ssl && (socket != 0)) {
//                    Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER);
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.CIPHER_SUITE_KEY, sslO);
//                    }
//                    // Client certificate chain if present
//                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
//                    X509Certificate[] certs = null;
//                    if (certLength > 0) {
//                        certs = new X509Certificate[certLength];
//                        for (int i = 0; i < certLength; i++) {
//                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
//                            CertificateFactory cf =
//                                CertificateFactory.getInstance("X.509");
//                            ByteArrayInputStream stream = new ByteArrayInputStream(data);
//                            certs[i] = (X509Certificate) cf.generateCertificate(stream);
//                        }
//                    }
//                    if (certs != null) {
//                        request.setAttribute
//                            (NioEndpoint.CERTIFICATE_KEY, certs);
//                    }
//                    // User key size
//                    sslO = new Integer(SSLSocket.getInfoI(socket, SSL.SSL_INFO_CIPHER_USEKEYSIZE));
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.KEY_SIZE_KEY, sslO);
//                    }
//                    // SSL session ID
//                    sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID);
//                    if (sslO != null) {
//                        request.setAttribute
//                            (NioEndpoint.SESSION_ID_KEY, sslO);
//                    }
//                } catch (Exception e) {
//                    log.warn(sm.getString("http11processor.socket.ssl"), e);
//                }
//            }

        } else if (actionCode == ActionCode.ACTION_REQ_SSL_CERTIFICATE) {

//            if (ssl && (socket != 0)) {
//                 // Consume and buffer the request body, so that it does not
//                 // interfere with the client's handshake messages
//                InputFilter[] inputFilters = inputBuffer.getFilters();
//                ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER])
//                    .setLimit(maxSavePostSize);
//                inputBuffer.addActiveFilter
//                    (inputFilters[Constants.BUFFERED_FILTER]);
//                try {
//                    // Renegociate certificates
//                    SSLSocket.renegotiate(socket);
//                    // Client certificate chain if present
//                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
//                    X509Certificate[] certs = null;
//                    if (certLength > 0) {
//                        certs = new X509Certificate[certLength];
//                        for (int i = 0; i < certLength; i++) {
//                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
//                            CertificateFactory cf =
//                                CertificateFactory.getInstance("X.509");
//                            ByteArrayInputStream stream = new ByteArrayInputStream(data);
//                            certs[i] = (X509Certificate) cf.generateCertificate(stream);
//                        }
//                    }
//                    if (certs != null) {
//                        request.setAttribute
//                            (NioEndpoint.CERTIFICATE_KEY, certs);
//                    }
//                } catch (Exception e) {
//                    log.warn(sm.getString("http11processor.socket.ssl"), e);
//                }
//            }

        } else if (actionCode == ActionCode.ACTION_REQ_SET_BODY_REPLAY) {
            ByteChunk body = (ByteChunk) param;

            InputFilter savedBody = new SavedRequestInputFilter(body);
            savedBody.setRequest(request);

            InternalNioInputBuffer internalBuffer = (InternalNioInputBuffer)
            internalBuffer.addActiveFilter(savedBody);
        }

    }


    // ------------------------------------------------------ Connector Methods


    /**
     * Set the associated adapter.
     *
     * @param adapter the new adapter
     */
    public void setAdapter(Adapter adapter) {
        this.adapter = adapter;
    }


    /**
     * Get the associated adapter.
     *
     * @return the associated adapter
     */
    public Adapter getAdapter() {
        return adapter;
    }


    // ------------------------------------------------------ Protected Methods


    /**
     * After reading the request headers, we have to setup the request filters.
     */
    protected void prepareRequest() {

        http11 = true;
        http09 = false;
        contentDelimitation = false;
        expectation = false;
        if (ssl) {
            request.scheme().setString("https");
        }
        MessageBytes protocolMB = request.protocol();
        if (protocolMB.equals(Constants.HTTP_11)) {
            http11 = true;
            protocolMB.setString(Constants.HTTP_11);
        } else if (protocolMB.equals(Constants.HTTP_10)) {
            http11 = false;
            keepAlive = false;
            protocolMB.setString(Constants.HTTP_10);
        } else if (protocolMB.equals("")) {
            // HTTP/0.9
            http09 = true;
            http11 = false;
            keepAlive = false;
        } else {
            // Unsupported protocol
            http11 = false;
            error = true;
            // Send 505; Unsupported HTTP version
            response.setStatus(505);
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals(Constants.GET)) {
            methodMB.setString(Constants.GET);
        } else if (methodMB.equals(Constants.POST)) {
            methodMB.setString(Constants.POST);
        }

        MimeHeaders headers = request.getMimeHeaders();

        // Check connection header
        MessageBytes connectionValueMB = headers.getValue("connection");
        if (connectionValueMB != null) {
            ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
            if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
                keepAlive = false;
            } else if (findBytes(connectionValueBC,
                                 Constants.KEEPALIVE_BYTES) != -1) {
                keepAlive = true;
            }
        }

        MessageBytes expectMB = null;
        if (http11)
            expectMB = headers.getValue("expect");
        if ((expectMB != null)
            && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) {
            inputBuffer.setSwallowInput(false);
            expectation = true;
        }

        // Check user-agent header
        if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) {
            MessageBytes userAgentValueMB = headers.getValue("user-agent");
            // Check in the restricted list, and adjust the http11
            // and keepAlive flags accordingly
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();
                for (int i = 0; i < restrictedUserAgents.length; i++) {
                    if (restrictedUserAgents[i].matcher(userAgentValue).matches()) {
                        http11 = false;
                        keepAlive = false;
                        break;
                    }
                }
            }
        }

        // Check for a full URI (including protocol://host:port/)
        ByteChunk uriBC = request.requestURI().getByteChunk();
        if (uriBC.startsWithIgnoreCase("http", 0)) {

            int pos = uriBC.indexOf("://", 0, 3, 4);
            int uriBCStart = uriBC.getStart();
            int slashPos = -1;
            if (pos != -1) {
                byte[] uriB = uriBC.getBytes();
                slashPos = uriBC.indexOf('/', pos + 3);
                if (slashPos == -1) {
                    slashPos = uriBC.getLength();
                    // Set URI as "/"
                    request.requestURI().setBytes
                        (uriB, uriBCStart + pos + 1, 1);
                } else {
                    request.requestURI().setBytes
                        (uriB, uriBCStart + slashPos,
                         uriBC.getLength() - slashPos);
                }
                MessageBytes hostMB = headers.setValue("host");
                hostMB.setBytes(uriB, uriBCStart + pos + 3,
                                slashPos - pos - 3);
            }

        }

        // Input filter setup
        InputFilter[] inputFilters = inputBuffer.getFilters();

        // Parse transfer-encoding header
        MessageBytes transferEncodingValueMB = null;
        if (http11)
            transferEncodingValueMB = headers.getValue("transfer-encoding");
        if (transferEncodingValueMB != null) {
            String transferEncodingValue = transferEncodingValueMB.toString();
            // Parse the comma separated list. "identity" codings are ignored
            int startPos = 0;
            int commaPos = transferEncodingValue.indexOf(',');
            String encodingName = null;
            while (commaPos != -1) {
                encodingName = transferEncodingValue.substring
                    (startPos, commaPos).toLowerCase().trim();
                if (!addInputFilter(inputFilters, encodingName)) {
                    // Unsupported transfer encoding
                    error = true;
                    // 501 - Unimplemented
                    response.setStatus(501);
                }
                startPos = commaPos + 1;
                commaPos = transferEncodingValue.indexOf(',', startPos);
            }
            encodingName = transferEncodingValue.substring(startPos)
                .toLowerCase().trim();
            if (!addInputFilter(inputFilters, encodingName)) {
                // Unsupported transfer encoding
                error = true;
                // 501 - Unimplemented
                response.setStatus(501);
            }
        }

        // Parse content-length header
        long contentLength = request.getContentLengthLong();
        if (contentLength >= 0 && !contentDelimitation) {
            inputBuffer.addActiveFilter
                (inputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        }

        MessageBytes valueMB = headers.getValue("host");

        // Check host header
        if (http11 && (valueMB == null)) {
            error = true;
            // 400 - Bad request
            response.setStatus(400);
        }

        parseHost(valueMB);

        if (!contentDelimitation) {
            // If there's no content length 
            // (broken HTTP/1.0 or HTTP/1.1), assume
            // the client is not broken and didn't send a body
     *
            inputBuffer.addActiveFilter
                    (inputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }

        // Advertise comet support through a request attribute
        request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
        // Advertise comet timeout support
        request.setAttribute("org.apache.tomcat.comet.timeout.support", Boolean.TRUE);

    }


    /**
     * Parse host.
     */
    public void parseHost(MessageBytes valueMB) {

        if (valueMB == null || valueMB.isNull()) {
            // HTTP/1.0
            // Default is what the socket tells us. Overriden if a host is
            // found/parsed
            request.setServerPort(endpoint.getPort());
            return;
        }

        ByteChunk valueBC = valueMB.getByteChunk();
        byte[] valueB = valueBC.getBytes();
        int valueL = valueBC.getLength();
        int valueS = valueBC.getStart();
        int colonPos = -1;
        if (hostNameC.length < valueL) {
            hostNameC = new char[valueL];
        }

        boolean ipv6 = (valueB[valueS] == '[');
        boolean bracketClosed = false;
        for (int i = 0; i < valueL; i++) {
            char b = (char) valueB[i + valueS];
            hostNameC[i] = b;
            if (b == ']') {
                bracketClosed = true;
            } else if (b == ':') {
                if (!ipv6 || bracketClosed) {
                    colonPos = i;
                    break;
                }
            }
        }

        if (colonPos < 0) {
            if (!ssl) {
                // 80 - Default HTTP port
                request.setServerPort(80);
            } else {
                // 443 - Default HTTPS port
                request.setServerPort(443);
            request.serverName().setChars(hostNameC, 0, valueL);
        } else {

            request.serverName().setChars(hostNameC, 0, colonPos);

            int port = 0;
            int mult = 1;
            for (int i = valueL - 1; i > colonPos; i--) {
                int charValue = HexUtils.DEC[(int) valueB[i + valueS]];
                if (charValue == -1) {
                    // Invalid character
                    error = true;
                    // 400 - Bad request
                    response.setStatus(400);
                    break;
                }
                port = port + (charValue * mult);
                mult = 10 * mult;
            }
            request.setServerPort(port);

        }

    }


    /**
     * Check for compression
     */
    private boolean isCompressable() {

        // Nope Compression could works in HTTP 1.0 also
        // cf: mod_deflate

        // Compression only since HTTP 1.1
        // if (! http11)
        //    return false;

        // Check if browser support gzip encoding
        MessageBytes acceptEncodingMB =
            request.getMimeHeaders().getValue("accept-encoding");

        if ((acceptEncodingMB == null)
            || (acceptEncodingMB.indexOf("gzip") == -1))
            return false;

        // Check if content is not allready gzipped
        MessageBytes contentEncodingMB =
            response.getMimeHeaders().getValue("Content-Encoding");

        if ((contentEncodingMB != null)
            && (contentEncodingMB.indexOf("gzip") != -1))
            return false;

        // If force mode, allways compress (test purposes only)
        if (compressionLevel == 2)
           return true;

        // Check for incompatible Browser
        if (noCompressionUserAgents != null) {
            MessageBytes userAgentValueMB =
                request.getMimeHeaders().getValue("user-agent");
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();

                // If one Regexp rule match, disable compression
                for (int i = 0; i < noCompressionUserAgents.length; i++)
                    if (noCompressionUserAgents[i].matcher(userAgentValue).matches())
                        return false;
            }
        }

        // Check if suffisant len to trig the compression
        long contentLength = response.getContentLengthLong();
        if ((contentLength == -1)
            || (contentLength > compressionMinSize)) {
            // Check for compatible MIME-TYPE
            if (compressableMimeTypes != null) {
                return (startsWithStringArray(compressableMimeTypes,
                                              response.getContentType()));
            }
        }

        return false;
    }


    /**
     * When committing the response, we have to validate the set of headers, as
     * well as setup the response filters.
     */
    protected void prepareResponse() {

        boolean entityBody = true;
        contentDelimitation = false;

        OutputFilter[] outputFilters = outputBuffer.getFilters();

        if (http09 == true) {
            // HTTP/0.9
            outputBuffer.addActiveFilter
                (outputFilters[Constants.IDENTITY_FILTER]);
            return;
        }

        int statusCode = response.getStatus();
        if ((statusCode == 204) || (statusCode == 205)
            || (statusCode == 304)) {
            // No entity body
            outputBuffer.addActiveFilter
                (outputFilters[Constants.VOID_FILTER]);
            entityBody = false;
            contentDelimitation = true;
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals("HEAD")) {
            // No entity body
            outputBuffer.addActiveFilter
                (outputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }


        // Check for compression
        boolean useCompression = false;
        if (entityBody && (compressionLevel > 0)) {
            useCompression = isCompressable();
            // Change content-length to -1 to force chunking
            if (useCompression) {
                response.setContentLength(-1);
            }
        }

        MimeHeaders headers = response.getMimeHeaders();
        if (!entityBody) {
            response.setContentLength(-1);
        } else {
            String contentType = response.getContentType();
            if (contentType != null) {
                headers.setValue("Content-Type").setString(contentType);
            }
            String contentLanguage = response.getContentLanguage();
            if (contentLanguage != null) {
                headers.setValue("Content-Language")
                    .setString(contentLanguage);
            }
        }

        long contentLength = response.getContentLengthLong();
        if (contentLength != -1) {
            headers.setValue("Content-Length").setLong(contentLength);
            outputBuffer.addActiveFilter
                (outputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        } else {
            if (entityBody && http11 && keepAlive) {
                outputBuffer.addActiveFilter
                    (outputFilters[Constants.CHUNKED_FILTER]);
                contentDelimitation = true;
                headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
            } else {
                outputBuffer.addActiveFilter
                    (outputFilters[Constants.IDENTITY_FILTER]);
            }
        }

        if (useCompression) {
            outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
            headers.setValue("Content-Encoding").setString("gzip");
            // Make Proxies happy via Vary (from mod_deflate)
            headers.setValue("Vary").setString("Accept-Encoding");
        }

        // Add date header
        headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate());

        // FIXME: Add transfer encoding header

        if ((entityBody) && (!contentDelimitation)) {
            // Mark as close the connection after the request, and add the
            // connection: close header
            keepAlive = false;
        }

        // If we know that the request is bad this early, add the
        // Connection: close header.
        keepAlive = keepAlive && !statusDropsConnection(statusCode);
        if (!keepAlive) {
            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
        } else if (!http11 && !error) {
            headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
        }

        // Build the response header
        outputBuffer.sendStatus();

        // Add server header
        if (server != null) {
            headers.setValue("Server").setString(server);
        } else {
            outputBuffer.write(Constants.SERVER_BYTES);
        }

        int size = headers.size();
        for (int i = 0; i < size; i++) {
            outputBuffer.sendHeader(headers.getName(i), headers.getValue(i));
        }
        outputBuffer.endHeaders();

    }


    /**
     * Initialize standard input and output filters.
     */
    protected void initializeFilters() {

        // Create and add the identity filters.
        inputBuffer.addFilter(new IdentityInputFilter());
        outputBuffer.addFilter(new IdentityOutputFilter());

        // Create and add the chunked filters.
        inputBuffer.addFilter(new ChunkedInputFilter());
        outputBuffer.addFilter(new ChunkedOutputFilter());

        // Create and add the void filters.
        inputBuffer.addFilter(new VoidInputFilter());
        outputBuffer.addFilter(new VoidOutputFilter());

        // Create and add buffered input filter
        inputBuffer.addFilter(new BufferedInputFilter());

        // Create and add the chunked filters.
        //inputBuffer.addFilter(new GzipInputFilter());
        outputBuffer.addFilter(new GzipOutputFilter());

    }


    /**
     * Add an input filter to the current request.
     * @return false if the encoding was not found (which would mean it is
     * unsupported)
     */
    protected boolean addInputFilter(InputFilter[] inputFilters,
                                     String encodingName) {
        if (encodingName.equals("identity")) {
            // Skip
        } else if (encodingName.equals("chunked")) {
            inputBuffer.addActiveFilter
                (inputFilters[Constants.CHUNKED_FILTER]);
            contentDelimitation = true;
        } else {
            for (int i = 2; i < inputFilters.length; i++) {
                if (inputFilters[i].getEncodingName()
                    .toString().equals(encodingName)) {
                    inputBuffer.addActiveFilter(inputFilters[i]);
                    return true;
                }
            }
            return false;
        }
        return true;
    }


    /**
     * Specialized utility method: find a sequence of lower case bytes inside
     * a ByteChunk.
     */
    protected int findBytes(ByteChunk bc, byte[] b) {

        byte first = b[0];
        byte[] buff = bc.getBuffer();
        int start = bc.getStart();
        int end = bc.getEnd();

    // Look for first char
    int srcEnd = b.length;

    for (int i = start; i <= (end - srcEnd); i++) {
        if (Ascii.toLower(buff[i]) != first) continue;
        // found first char, now look for a match
            int myPos = i+1;
        for (int srcPos = 1; srcPos < srcEnd; ) {
                if (Ascii.toLower(buff[myPos++]) != b[srcPos++])
            break;
                if (srcPos == srcEnd) return i - start; // found it
        }
    }
    return -1;

    }

    /**
     * Determine if we must drop the connection because of the HTTP status
     * code.  Use the same list of codes as Apache/httpd.
     */
    protected boolean statusDropsConnection(int status) {
        return status == 400 /* SC_BAD_REQUEST */ ||
               status == 408 /* SC_REQUEST_TIMEOUT */ ||
               status == 411 /* SC_LENGTH_REQUIRED */ ||
               status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
               status == 414 /* SC_REQUEST_URI_TOO_LARGE */ ||
               status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
               status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
               status == 501 /* SC_NOT_IMPLEMENTED */;
    }

}
File
Http11NioProcessor.java
Developer's decision
Version 2
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content


import org.apache.tomcat.util.net.NioEndpoint;
    /**
<<<<<<< HEAD
/*
 *  Copyright 1999-2004 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.coyote.http11;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.CharChunk;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.res.StringManager;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;

/**
 * Output buffer.
 * 
 * @author Remy Maucherat
 * @author Filip Hanik
 */
public class InternalNioOutputBuffer 
    implements OutputBuffer {


    // -------------------------------------------------------------- Constants


    // ----------------------------------------------------------- Constructors
    int bbufLimit = 0;
    
    Selector selector;

    /**
     * Default constructor.
     */
    public InternalNioOutputBuffer(Response response) {
        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
    }


    /**
     * Alternate constructor.
     */
    public InternalNioOutputBuffer(Response response, int headerBufferSize) {

        this.response = response;
        headers = response.getMimeHeaders();

        buf = new byte[headerBufferSize];
        
        if (headerBufferSize < (8 * 1024)) {
            bbufLimit = 6 * 1500;    
        } else {
            bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
        }
        bbuf = ByteBuffer.allocateDirect(bbufLimit);

        outputStreamOutputBuffer = new SocketOutputBuffer();

        filterLibrary = new OutputFilter[0];
        activeFilters = new OutputFilter[0];
        lastActiveFilter = -1;

        committed = false;
        finished = false;

        // Cause loading of HttpMessages
        HttpMessages.getMessage(200);




    /**

        this.total = 0;
    }

    } 
     * The string manager for this package.
     */


    protected static StringManager sm =
        StringManager.getManager(Constants.Package);
    protected OutputBuffer outputStreamOutputBuffer;




    /**
        return filterLibrary;

    }
     * Filter library.


    /**
     * Note: Filter[0] is always the "chunked" filter.
     */
    protected OutputFilter[] filterLibrary;
    // -------------------------------------------------------------- Variables

    }
     * Underlying output buffer.
     */

    /**
    // ----------------------------------------------------- Instance Variables


    /**
     * Associated Coyote response.
     */
    protected Response response;


    /**
     * Headers of the associated request.
     */
    protected MimeHeaders headers;


    /**
     * Committed flag.
     */
    protected boolean committed;


    /**
     * Finished flag.
     */
    protected boolean finished;


    /**
     * Pointer to the current write buffer.
     */
    protected byte[] buf;


    /**
     * Position in the buffer.
     */
    protected int pos;


    /**
     * Underlying socket.
     */
    protected SocketChannel socket;


    /**
     * Active filter (which is actually the top of the pipeline).
     */
    protected OutputFilter[] activeFilters;


    /**
     * Index of the last active filter.
     */
    protected int lastActiveFilter;


    /**
     * Direct byte buffer used for writing.
     */
    protected ByteBuffer bbuf = null;


    // ------------------------------------------------------------- Properties


    /**
     * Set the underlying socket.
     */
    public void setSocket(SocketChannel socket) {
        this.socket = socket;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }

    /**
     * Get the underlying socket input stream.
     */
    public SocketChannel getSocket() {
        return socket;
    }
    /**
     * Set the socket buffer size.
     */
    public void setSocketBuffer(int socketBufferSize) {
        // FIXME: Remove
    }


    /**
     * Add an output filter to the filter library.
     */
    public void addFilter(OutputFilter filter) {

        OutputFilter[] newFilterLibrary = 
            new OutputFilter[filterLibrary.length + 1];
        for (int i = 0; i < filterLibrary.length; i++) {
            newFilterLibrary[i] = filterLibrary[i];
        }
        newFilterLibrary[filterLibrary.length] = filter;
        filterLibrary = newFilterLibrary;

        activeFilters = new OutputFilter[filterLibrary.length];

    }


     * Clear filters.
     */
    public void clearFilters() {

        filterLibrary = new OutputFilter[0];
        lastActiveFilter = -1;
    /**
     * Get filters.
     */
    public OutputFilter[] getFilters() {

     * 

        if (!committed) {
    /**
     * Add an output filter to the filter library.
     */
    public void addActiveFilter(OutputFilter filter) {

        if (lastActiveFilter == -1) {
            filter.setBuffer(outputStreamOutputBuffer);
        } else {
            for (int i = 0; i <= lastActiveFilter; i++) {
                if (activeFilters[i] == filter)
                    return;
            }
            filter.setBuffer(activeFilters[lastActiveFilter]);
        }

        activeFilters[++lastActiveFilter] = filter;

        filter.setResponse(response);

    }


    // --------------------------------------------------------- Public Methods


    /**
     * Flush the response.
     * 
     * @throws IOException an undelying I/O error occured
     */
    public void flush()
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeader) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        // Flush the current buffer
        flushBuffer();

    }


    /**
     * Reset current response.
     * @throws IllegalStateException if the response has already been committed
     */
    public void reset() {

        if (committed)
            throw new IllegalStateException(/*FIXME:Put an error message*/);

        // Recycle Request object
        response.recycle();

    }


    /**
     * Recycle the output buffer. This should be called when closing the 
     * connection.
     */
    public void recycle() {

        // Recycle Request object
        response.recycle();
        bbuf.clear();

        socket = null;
        pos = 0;
        lastActiveFilter = -1;
        committed = false;
        finished = false;

    }


    /**
     * End processing of current HTTP request.
     * Note: All bytes of the current request should have been already 
     * consumed. This method only resets all the pointers so that we are ready
     * to parse the next HTTP request.
     */
    public void nextRequest() {

        // Recycle Request object
        response.recycle();

        // Recycle filters
        for (int i = 0; i <= lastActiveFilter; i++) {
            activeFilters[i].recycle();
        }

        // Reset pointers
        pos = 0;
        lastActiveFilter = -1;
        committed = false;
        finished = false;

    }


    /**
     * End request.
     * 
     * @throws IOException an undelying I/O error occured
     */
    public void endRequest()
        throws IOException {
     * Send the response status line.

     */
     */
    public void sendStatus() {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeader) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        if (finished)
            return;

        if (lastActiveFilter != -1)
            activeFilters[lastActiveFilter].end();

        flushBuffer();

        finished = true;

    }


    // ------------------------------------------------ HTTP/1.1 Output Methods


    /**
     * Send an acknoledgement.
     */
    public void sendAck()
        throws IOException {

        if (!committed) {
            //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
            ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
            writeToSocket(buf,false);
        }

    }

    private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
        int limit = bytebuffer.position();
        if ( flip ) bytebuffer.flip();
        while ( bytebuffer.hasRemaining() ) {
            int written = socket.write(bytebuffer);
        }
        bbuf.clear();
        // Write protocol name
        write(Constants.HTTP_11_BYTES);
        buf[pos++] = Constants.SP;

        // Write status code
        int status = response.getStatus();
        switch (status) {
        case 200:
            write(Constants._200_BYTES);
            break;
        case 400:
            write(Constants._400_BYTES);
            break;
        case 404:
            write(Constants._404_BYTES);
            break;
        default:
            write(status);
        }

        buf[pos++] = Constants.SP;

        // Write message
        String message = response.getMessage();
        if (message == null) {
            write(HttpMessages.getMessage(status));
        } else {
            write(message);
        }

        // End the response status line
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(MessageBytes name, MessageBytes value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     * 
    public void sendHeader(ByteChunk name, ByteChunk value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(String name, String value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * End the header block.
     */
    public void endHeaders() {

        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    // --------------------------------------------------- OutputBuffer Methods


    /**
     * Write the contents of a byte chunk.
     * 
     * @param chunk byte chunk
     * @return number of bytes written
     * @throws IOException an undelying I/O error occured
     */
    public int doWrite(ByteChunk chunk, Response res) 
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeaders) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        if (lastActiveFilter == -1)
            return outputStreamOutputBuffer.doWrite(chunk, res);
        else
            return activeFilters[lastActiveFilter].doWrite(chunk, res);

    }


    // ------------------------------------------------------ Protected Methods


    /**
     * Commit the response.
     * 
     * @throws IOException an undelying I/O error occured
     */
    protected void commit()
        throws IOException {

        // The response is now committed
        committed = true;
        response.setCommitted(true);

        if (pos > 0) {
            // Sending the response header buffer
            addToBB(buf, 0, pos);
        }

    }

    int total = 0;
    private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
        try {
            if (bbuf.capacity() <= (offset + length)) {
                flushBuffer();
            }
            bbuf.put(buf, offset, length);
            total += length;
        }catch ( Exception x ) {
            x.printStackTrace();
        }
        //System.out.println("Total:"+total);
    }


    /**
     * This method will write the contents of the specyfied message bytes 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * @param mb data to be written
     */
    protected void write(MessageBytes mb) {

        if (mb.getType() == MessageBytes.T_BYTES) {
            ByteChunk bc = mb.getByteChunk();
            write(bc);
        } else if (mb.getType() == MessageBytes.T_CHARS) {
            CharChunk cc = mb.getCharChunk();
            write(cc);
        } else {
            write(mb.toString());
        }

    }


    /**
     * This method will write the contents of the specyfied message bytes 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param bc data to be written
     */
    protected void write(ByteChunk bc) {

        // Writing the byte chunk to the output buffer
        System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos,
                         bc.getLength());
        pos = pos + bc.getLength();

    }


    /**
     * This method will write the contents of the specyfied char 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param cc data to be written
     */
    protected void write(CharChunk cc) {

        int start = cc.getStart();
        int end = cc.getEnd();
        char[] cbuf = cc.getBuffer();
        for (int i = start; i < end; i++) {
            char c = cbuf[i];
            // Note:  This is clearly incorrect for many strings,
            // but is the only consistent approach within the current
            // servlet framework.  It must suffice until servlet output
            // streams properly encode their output.
            if ((c <= 31) && (c != 9)) {
                c = ' ';
            } else if (c == 127) {
                c = ' ';
            }
            buf[pos++] = (byte) c;
        }

    }


    /**
     * This method will write the contents of the specyfied byte 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param b data to be written
     */
    public void write(byte[] b) {

        // Writing the byte chunk to the output buffer
        System.arraycopy(b, 0, buf, pos, b.length);
        pos = pos + b.length;

    }


    /**
     * This method will write the contents of the specyfied String to the 
     * output stream, without filtering. This method is meant to be used to 
     * write the response header.
     * 
     * @param s data to be written
     */
    protected void write(String s) {

        if (s == null)
            return;

        // From the Tomcat 3.3 HTTP/1.0 connector
        int len = s.length();
        for (int i = 0; i < len; i++) {
            char c = s.charAt (i);
            // Note:  This is clearly incorrect for many strings,
            // but is the only consistent approach within the current
            // servlet framework.  It must suffice until servlet output
            // streams properly encode their output.
            if ((c <= 31) && (c != 9)) {
                c = ' ';
            } else if (c == 127) {
                c = ' ';
            }
                if (bbuf.position() == bbuf.capacity()) {
            buf[pos++] = (byte) c;
        }

    }


    /**
     */
     * This method will print the specified integer to the output stream, 
     * without filtering. This method is meant to be used to write the 
     * response header.
     * 
     * @param i data to be written
     */
    protected void write(int i) {

        write(String.valueOf(i));

    }


    /**
     * Callback to write data from the buffer.
     */
    protected void flushBuffer()
        throws IOException {

        //prevent timeout for async,
        SelectionKey key = socket.keyFor(selector);
        if (key != null) {
            NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
            attach.access();
        }

        //write to the socket, if there is anything to write
        if (bbuf.position() > 0) {
            writeToSocket(bbuf,true);
        }
    }


    // ----------------------------------- OutputStreamOutputBuffer Inner Class


    /**
     * This class is an output buffer which will write data to an output
     * stream.
     */
    protected class SocketOutputBuffer 
        implements OutputBuffer {


        /**
         * Write chunk.
         */
        public int doWrite(ByteChunk chunk, Response res) 
            throws IOException {

            int len = chunk.getLength();
            int start = chunk.getStart();
            byte[] b = chunk.getBuffer();
            while (len > 0) {
                int thisTime = len;
                    flushBuffer();
                }
                if (thisTime > bbuf.capacity() - bbuf.position()) {
                    thisTime = bbuf.capacity() - bbuf.position();
                }
                addToBB(b,start,thisTime);
                len = len - thisTime;
                start = start + thisTime;
            }
            return chunk.getLength();

        }


    }


}
=======
/*
 *  Copyright 1999-2004 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.coyote.http11;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.CharChunk;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.res.StringManager;
        } else {
import java.nio.channels.SelectionKey;
import org.apache.tomcat.util.net.NioEndpoint;
import java.nio.channels.Selector;

/**
 * Output buffer.
 * 
 * @author Remy Maucherat
 * @author Filip Hanik
 */
public class InternalNioOutputBuffer 
    implements OutputBuffer {


    // -------------------------------------------------------------- Constants


    // ----------------------------------------------------------- Constructors
    int bbufLimit = 0;
    
    Selector selector;

    /**
     * Default constructor.
     */
    public InternalNioOutputBuffer(Response response) {
        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
    }


    /**
     * Alternate constructor.
     */
    public InternalNioOutputBuffer(Response response, int headerBufferSize) {

        this.response = response;
        headers = response.getMimeHeaders();

        buf = new byte[headerBufferSize];
        
        if (headerBufferSize < (8 * 1024)) {
            bbufLimit = 6 * 1500;    
        } else {
            bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
        }
        bbuf = ByteBuffer.allocateDirect(bbufLimit);

        outputStreamOutputBuffer = new SocketOutputBuffer();

        filterLibrary = new OutputFilter[0];
        activeFilters = new OutputFilter[0];
        lastActiveFilter = -1;

        committed = false;
        finished = false;

        // Cause loading of HttpMessages
        HttpMessages.getMessage(200);

    }


    // -------------------------------------------------------------- Variables


    /**
     * The string manager for this package.
     */
    protected static StringManager sm =
        StringManager.getManager(Constants.Package);


    // ----------------------------------------------------- Instance Variables


    /**
     * Associated Coyote response.
     */
    protected Response response;


    /**
     * Headers of the associated request.
     */
    protected MimeHeaders headers;


    /**
     * Committed flag.
     */
    protected boolean committed;


    /**
     * Finished flag.
     */
    protected boolean finished;


    /**
     * Pointer to the current write buffer.
     */
    protected byte[] buf;


    /**
     * Position in the buffer.
     */
    protected int pos;


    /**
     * Underlying socket.
     */
    protected SocketChannel socket;


    /**
     * Underlying output buffer.

    protected OutputBuffer outputStreamOutputBuffer;


    /**
     * Filter library.
     * Note: Filter[0] is always the "chunked" filter.
     */
    protected OutputFilter[] filterLibrary;


    /**
     * Active filter (which is actually the top of the pipeline).
     */
    protected OutputFilter[] activeFilters;


    /**
     * Index of the last active filter.
     */
    protected int lastActiveFilter;


    /**
     * Direct byte buffer used for writing.
     */
    protected ByteBuffer bbuf = null;


    // ------------------------------------------------------------- Properties


    /**
     * Set the underlying socket.
     */
    public void setSocket(SocketChannel socket) {
        this.socket = socket;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }

    /**
     * Get the underlying socket input stream.
     */
    public SocketChannel getSocket() {
        return socket;
    }
    /**
     * Set the socket buffer size.
     */
    public void setSocketBuffer(int socketBufferSize) {
        // FIXME: Remove
    }


    /**
     * Add an output filter to the filter library.
     */
    public void addFilter(OutputFilter filter) {

        OutputFilter[] newFilterLibrary = 
            new OutputFilter[filterLibrary.length + 1];
        for (int i = 0; i < filterLibrary.length; i++) {
            newFilterLibrary[i] = filterLibrary[i];
        }
        newFilterLibrary[filterLibrary.length] = filter;
        filterLibrary = newFilterLibrary;

        activeFilters = new OutputFilter[filterLibrary.length];

    }


    /**
     * Get filters.
     */
    public OutputFilter[] getFilters() {

        return filterLibrary;

    }


    /**
     * Clear filters.
     */
    public void clearFilters() {

        filterLibrary = new OutputFilter[0];
        lastActiveFilter = -1;

    }


    /**
     * Add an output filter to the filter library.
     */
    public void addActiveFilter(OutputFilter filter) {

        if (lastActiveFilter == -1) {
            filter.setBuffer(outputStreamOutputBuffer);
        } else {
            for (int i = 0; i <= lastActiveFilter; i++) {
                if (activeFilters[i] == filter)
                    return;
            }
            filter.setBuffer(activeFilters[lastActiveFilter]);
        }

        activeFilters[++lastActiveFilter] = filter;

        filter.setResponse(response);

    }


    // --------------------------------------------------------- Public Methods
    /**
    /**
     * Flush the response.
     * 
     * @throws IOException an undelying I/O error occured
     */
    public void flush()
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeader) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        // Flush the current buffer
        flushBuffer();

    }


    /**
     * Reset current response.
     * 
     * @throws IllegalStateException if the response has already been committed
     */
    public void reset() {

        if (committed)
            throw new IllegalStateException(/*FIXME:Put an error message*/);

        // Recycle Request object
        response.recycle();

    }


    /**
     * Recycle the output buffer. This should be called when closing the 
     * connection.
     */
    public void recycle() {

        // Recycle Request object
        response.recycle();
        bbuf.clear();

        socket = null;
        pos = 0;
        lastActiveFilter = -1;
        committed = false;
        finished = false;

    }


    /**
        if (s == null)
     * End processing of current HTTP request.
     * Note: All bytes of the current request should have been already 
     * consumed. This method only resets all the pointers so that we are ready
     * to parse the next HTTP request.
     */
    public void nextRequest() {

        // Recycle Request object
        response.recycle();

        // Recycle filters
        for (int i = 0; i <= lastActiveFilter; i++) {
            activeFilters[i].recycle();
        }

        // Reset pointers
        pos = 0;
        lastActiveFilter = -1;
        committed = false;
        finished = false;

    }


    /**
     * End request.
     * 
     * @throws IOException an undelying I/O error occured
     */
    public void endRequest()
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeader) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        if (finished)
            return;

        if (lastActiveFilter != -1)
            activeFilters[lastActiveFilter].end();

        flushBuffer();

        finished = true;

    }


    // ------------------------------------------------ HTTP/1.1 Output Methods


            write(HttpMessages.getMessage(status));
     * Send an acknoledgement.
     */
    public void sendAck()
        throws IOException {

        if (!committed) {
            //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
            ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
            writeToSocket(buf,false);
        }

    }

    private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
        int limit = bytebuffer.position();
        if ( flip ) bytebuffer.flip();
        while ( bytebuffer.hasRemaining() ) {
            int written = socket.write(bytebuffer);
        }
        bbuf.clear();
        this.total = 0;
    } 


    /**
     * Send the response status line.
     */
    public void sendStatus() {

        // Write protocol name
        write(Constants.HTTP_11_BYTES);
        buf[pos++] = Constants.SP;

        // Write status code
        int status = response.getStatus();
        switch (status) {
        case 200:
            write(Constants._200_BYTES);
            break;
        case 400:
            write(Constants._400_BYTES);
            break;
        case 404:
            write(Constants._404_BYTES);
            break;
        default:
            write(status);
        }

        buf[pos++] = Constants.SP;

        // Write message
        String message = response.getMessage();
        if (message == null) {
            write(message);
        }

        // End the response status line
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(MessageBytes name, MessageBytes value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(ByteChunk name, ByteChunk value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(String name, String value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
            return;
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    }
     * @throws IOException an undelying I/O error occured
     */
    protected void commit()
        throws IOException {

        // The response is now committed
        committed = true;
        response.setCommitted(true);

        if (pos > 0) {


    /**
     * End the header block.
     */
    public void endHeaders() {

        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    // --------------------------------------------------- OutputBuffer Methods


    /**
     * Write the contents of a byte chunk.
     * 
     * @param chunk byte chunk
     * @return number of bytes written
     * @throws IOException an undelying I/O error occured
     */
    public int doWrite(ByteChunk chunk, Response res) 
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeaders) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        if (lastActiveFilter == -1)
            return outputStreamOutputBuffer.doWrite(chunk, res);
        else
            return activeFilters[lastActiveFilter].doWrite(chunk, res);

    }


    // ------------------------------------------------------ Protected Methods


    /**
     * Commit the response.
     * 
            // Sending the response header buffer
            addToBB(buf, 0, pos);
        }

    }

    int total = 0;
    private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
        if (bbuf.capacity() <= (offset + length)) {
            flushBuffer();
        }
        bbuf.put(buf, offset, length);
        total += length;
    }


    /**
     * This method will write the contents of the specyfied message bytes 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param mb data to be written
     */
    protected void write(MessageBytes mb) {

        if (mb.getType() == MessageBytes.T_BYTES) {
            ByteChunk bc = mb.getByteChunk();
            write(bc);
        } else if (mb.getType() == MessageBytes.T_CHARS) {
            CharChunk cc = mb.getCharChunk();
            write(cc);
        } else {
            write(mb.toString());
        }

    }


    /**
     * This method will write the contents of the specyfied message bytes 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param bc data to be written
     */
    protected void write(ByteChunk bc) {

        // Writing the byte chunk to the output buffer
        System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos,
                         bc.getLength());
        pos = pos + bc.getLength();

    /**
     * output stream, without filtering. This method is meant to be used to 
     * write the response header.
     * 
     * @param s data to be written
     */
    protected void write(String s) {
        }
     * This method will write the contents of the specyfied char 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param cc data to be written
     */
    protected void write(CharChunk cc) {

        int start = cc.getStart();
        int end = cc.getEnd();
        char[] cbuf = cc.getBuffer();
        for (int i = start; i < end; i++) {
            char c = cbuf[i];
            // Note:  This is clearly incorrect for many strings,
            // but is the only consistent approach within the current
            // servlet framework.  It must suffice until servlet output
            // streams properly encode their output.
            if ((c <= 31) && (c != 9)) {
                c = ' ';
            } else if (c == 127) {
                c = ' ';
            }
            buf[pos++] = (byte) c;
        }

    }


    /**
     * This method will write the contents of the specyfied byte 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param b data to be written
     */
    public void write(byte[] b) {

        // Writing the byte chunk to the output buffer
        System.arraycopy(b, 0, buf, pos, b.length);
        pos = pos + b.length;

    }


    /**
     * This method will write the contents of the specyfied String to the 


        // From the Tomcat 3.3 HTTP/1.0 connector
        int len = s.length();
        for (int i = 0; i < len; i++) {
            char c = s.charAt (i);
            // Note:  This is clearly incorrect for many strings,
            // but is the only consistent approach within the current
            // servlet framework.  It must suffice until servlet output
            // streams properly encode their output.
            if ((c <= 31) && (c != 9)) {
                c = ' ';
            } else if (c == 127) {
                c = ' ';
            }
            buf[pos++] = (byte) c;
        }

    }


    /**
     * This method will print the specified integer to the output stream, 
     * without filtering. This method is meant to be used to write the 
     * response header.
     * 
     * @param i data to be written
     */
    protected void write(int i) {

        write(String.valueOf(i));

    }


    /**
     * Callback to write data from the buffer.
     */
    protected void flushBuffer()
        throws IOException {

        //prevent timeout for async,
        SelectionKey key = socket.keyFor(selector);
        if (key != null) {
            NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
            attach.access();
        }

        //write to the socket, if there is anything to write
        if (bbuf.position() > 0) {
            writeToSocket(bbuf,true);
    }


    // ----------------------------------- OutputStreamOutputBuffer Inner Class


    /**
     * This class is an output buffer which will write data to an output
     * stream.
     */
    protected class SocketOutputBuffer 
        implements OutputBuffer {


        /**
         * Write chunk.
         */
        public int doWrite(ByteChunk chunk, Response res) 
            throws IOException {

            int len = chunk.getLength();
            int start = chunk.getStart();
            byte[] b = chunk.getBuffer();
            while (len > 0) {
                int thisTime = len;
                if (bbuf.position() == bbuf.capacity()) {
                    flushBuffer();
                }
                if (thisTime > bbuf.capacity() - bbuf.position()) {
                    thisTime = bbuf.capacity() - bbuf.position();
                }
                addToBB(b,start,thisTime);
                len = len - thisTime;
                start = start + thisTime;
            }
            return chunk.getLength();

        }


    }


}
>>>>>>> c705e291c22dbbe2c8e25822fd1911a4481ef618
Solution content
    /**
        write(name);
/*
 *  Copyright 1999-2004 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.coyote.http11;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.CharChunk;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.HttpMessages;

    /**
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.res.StringManager;
import java.nio.channels.SelectionKey;
import org.apache.tomcat.util.net.NioEndpoint;
import java.nio.channels.Selector;

/**
 * Output buffer.
 * 
 * @author Remy Maucherat
 * @author Filip Hanik
 */
public class InternalNioOutputBuffer 
    implements OutputBuffer {


    // -------------------------------------------------------------- Constants


    // ----------------------------------------------------------- Constructors
    int bbufLimit = 0;
    
    Selector selector;

    /**
     * Default constructor.
     */
    public InternalNioOutputBuffer(Response response) {
        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
    }


    /**
     * Alternate constructor.
     */
    public InternalNioOutputBuffer(Response response, int headerBufferSize) {

        this.response = response;
        headers = response.getMimeHeaders();

        buf = new byte[headerBufferSize];
        
        if (headerBufferSize < (8 * 1024)) {
            bbufLimit = 6 * 1500;    
        } else {
            bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
        }
        bbuf = ByteBuffer.allocateDirect(bbufLimit);

        outputStreamOutputBuffer = new SocketOutputBuffer();

        filterLibrary = new OutputFilter[0];
        activeFilters = new OutputFilter[0];
        lastActiveFilter = -1;

        committed = false;
        finished = false;

        // Cause loading of HttpMessages
        HttpMessages.getMessage(200);

    }


    // -------------------------------------------------------------- Variables

     * The string manager for this package.
     */
    protected static StringManager sm =
        StringManager.getManager(Constants.Package);


    // ----------------------------------------------------- Instance Variables


    /**
     * Associated Coyote response.
     */
    protected Response response;


    /**
     * Headers of the associated request.
     */
    protected MimeHeaders headers;


    /**
     * Committed flag.
     */
    protected boolean committed;


    /**
     * Finished flag.
     */
    protected boolean finished;


    /**
     * Pointer to the current write buffer.
     */
    protected byte[] buf;


    /**
     * Position in the buffer.
     */
    protected int pos;


    /**
     * Underlying socket.
     */
    protected SocketChannel socket;


     * Underlying output buffer.
     */
    protected OutputBuffer outputStreamOutputBuffer;


    /**
     * Filter library.
     * Note: Filter[0] is always the "chunked" filter.
     */
    protected OutputFilter[] filterLibrary;


    /**
     * Active filter (which is actually the top of the pipeline).
     */
    protected OutputFilter[] activeFilters;


    /**
     * Index of the last active filter.
     */
    protected int lastActiveFilter;


    /**
     * Direct byte buffer used for writing.
     */
    protected ByteBuffer bbuf = null;


    // ------------------------------------------------------------- Properties


    /**
     * Set the underlying socket.
     */
    public void setSocket(SocketChannel socket) {
        this.socket = socket;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }

    /**
     * Get the underlying socket input stream.
     */
    public SocketChannel getSocket() {
        return socket;
    }
    /**
     * Set the socket buffer size.
     */
    public void setSocketBuffer(int socketBufferSize) {
        // FIXME: Remove
    }


    /**
     * Add an output filter to the filter library.
     */
    public void addFilter(OutputFilter filter) {

        OutputFilter[] newFilterLibrary = 
            new OutputFilter[filterLibrary.length + 1];
        for (int i = 0; i < filterLibrary.length; i++) {
            newFilterLibrary[i] = filterLibrary[i];
        }
        newFilterLibrary[filterLibrary.length] = filter;
        filterLibrary = newFilterLibrary;
        activeFilters = new OutputFilter[filterLibrary.length];

    }


    /**
     * Get filters.
     */
    public OutputFilter[] getFilters() {

        return filterLibrary;

    }


    /**
     * Clear filters.
     */
    public void clearFilters() {

        filterLibrary = new OutputFilter[0];
        lastActiveFilter = -1;

    }


    /**
     * Add an output filter to the filter library.
     */
    public void addActiveFilter(OutputFilter filter) {

        if (lastActiveFilter == -1) {
            filter.setBuffer(outputStreamOutputBuffer);
        } else {
            for (int i = 0; i <= lastActiveFilter; i++) {
                if (activeFilters[i] == filter)
                    return;
            }
            filter.setBuffer(activeFilters[lastActiveFilter]);
        }

        activeFilters[++lastActiveFilter] = filter;

        filter.setResponse(response);

    }


    // --------------------------------------------------------- Public Methods


    /**
     * Flush the response.
     * 
     * @throws IOException an undelying I/O error occured
     */
    public void flush()
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeader) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        // Flush the current buffer
        flushBuffer();

    }


    /**
     * Reset current response.
     * 
     * @throws IllegalStateException if the response has already been committed
     */
    public void reset() {

        if (committed)
            throw new IllegalStateException(/*FIXME:Put an error message*/);

        // Recycle Request object
        response.recycle();

    }


    /**
     * Recycle the output buffer. This should be called when closing the 
     * connection.
     */
    public void recycle() {

        // Recycle Request object
        response.recycle();
        bbuf.clear();

        socket = null;
        pos = 0;
        lastActiveFilter = -1;
        committed = false;
        finished = false;

    }


    /**
     * End processing of current HTTP request.
     * Note: All bytes of the current request should have been already 
     * consumed. This method only resets all the pointers so that we are ready
     * to parse the next HTTP request.
     */
    public void nextRequest() {

        // Recycle Request object
        response.recycle();

        // Recycle filters
        for (int i = 0; i <= lastActiveFilter; i++) {
            activeFilters[i].recycle();
        }

        // Reset pointers
        pos = 0;
        lastActiveFilter = -1;
        committed = false;
        finished = false;

    }


    /**
     * End request.
     * 
     * @throws IOException an undelying I/O error occured
     */
    public void endRequest()
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeader) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        if (finished)
            return;

        if (lastActiveFilter != -1)
            activeFilters[lastActiveFilter].end();

        flushBuffer();

        finished = true;

    }


    // ------------------------------------------------ HTTP/1.1 Output Methods


    /**
     * Send an acknoledgement.
     */
    public void sendAck()
        throws IOException {

        if (!committed) {
            //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
            ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
            writeToSocket(buf,false);
        }

    }

    private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
        int limit = bytebuffer.position();
        if ( flip ) bytebuffer.flip();
        while ( bytebuffer.hasRemaining() ) {
            int written = socket.write(bytebuffer);
        }
        bbuf.clear();
        this.total = 0;
    } 


    /**
     * Send the response status line.
     */
    public void sendStatus() {

        // Write protocol name
        write(Constants.HTTP_11_BYTES);
        buf[pos++] = Constants.SP;

        // Write status code
        int status = response.getStatus();
        switch (status) {
        case 200:
            write(Constants._200_BYTES);
            break;
        case 400:
            write(Constants._400_BYTES);
            break;
        case 404:
            write(Constants._404_BYTES);
            break;
        default:
            write(status);
        }

        buf[pos++] = Constants.SP;

        // Write message
        String message = response.getMessage();
        if (message == null) {
            write(HttpMessages.getMessage(status));
        } else {
            write(message);
        }

        // End the response status line
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(MessageBytes name, MessageBytes value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(ByteChunk name, ByteChunk value) {

        write(name);
        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * Send a header.
     * 
     * @param name Header name
     * @param value Header value
     */
    public void sendHeader(String name, String value) {

        buf[pos++] = Constants.COLON;
        buf[pos++] = Constants.SP;
        write(value);
        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    /**
     * End the header block.
     */
    public void endHeaders() {

        buf[pos++] = Constants.CR;
        buf[pos++] = Constants.LF;

    }


    // --------------------------------------------------- OutputBuffer Methods


    /**
     * Write the contents of a byte chunk.
     * 
     * @param chunk byte chunk
     * @return number of bytes written
     * @throws IOException an undelying I/O error occured
     */
    public int doWrite(ByteChunk chunk, Response res) 
        throws IOException {

        if (!committed) {

            // Send the connector a request for commit. The connector should
            // then validate the headers, send them (using sendHeaders) and 
            // set the filters accordingly.
            response.action(ActionCode.ACTION_COMMIT, null);

        }

        if (lastActiveFilter == -1)
            return outputStreamOutputBuffer.doWrite(chunk, res);
        else
            return activeFilters[lastActiveFilter].doWrite(chunk, res);

    }


    // ------------------------------------------------------ Protected Methods


    /**
     * Commit the response.
     * 
     * @throws IOException an undelying I/O error occured
     */
    protected void commit()
        throws IOException {

        // The response is now committed
        committed = true;
        response.setCommitted(true);

        if (pos > 0) {
            // Sending the response header buffer
            addToBB(buf, 0, pos);
        }

    }

    int total = 0;
    private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
        if (bbuf.capacity() <= (offset + length)) {
            flushBuffer();
        }
        bbuf.put(buf, offset, length);
        total += length;
    }


    /**
     * This method will write the contents of the specyfied message bytes 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param mb data to be written
     */
    protected void write(MessageBytes mb) {

        if (mb.getType() == MessageBytes.T_BYTES) {
            ByteChunk bc = mb.getByteChunk();
            write(bc);
        } else if (mb.getType() == MessageBytes.T_CHARS) {
            CharChunk cc = mb.getCharChunk();
            write(cc);
        } else {
            write(mb.toString());
        }

    }


    /**
     * This method will write the contents of the specyfied message bytes 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param bc data to be written
     */
    protected void write(ByteChunk bc) {

        // Writing the byte chunk to the output buffer
        System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos,
                         bc.getLength());
        pos = pos + bc.getLength();

    }


    /**
     * This method will write the contents of the specyfied char 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param cc data to be written
     */
    protected void write(CharChunk cc) {

        int start = cc.getStart();
        int end = cc.getEnd();
        char[] cbuf = cc.getBuffer();
        for (int i = start; i < end; i++) {
            char c = cbuf[i];
            // Note:  This is clearly incorrect for many strings,
            // but is the only consistent approach within the current
            // servlet framework.  It must suffice until servlet output
            // streams properly encode their output.
            if ((c <= 31) && (c != 9)) {
                c = ' ';
            } else if (c == 127) {
                c = ' ';
            }
            buf[pos++] = (byte) c;
        }

    }


    /**
     * This method will write the contents of the specyfied byte 
     * buffer to the output stream, without filtering. This method is meant to
     * be used to write the response header.
     * 
     * @param b data to be written
     */
    public void write(byte[] b) {

        // Writing the byte chunk to the output buffer
        System.arraycopy(b, 0, buf, pos, b.length);
        pos = pos + b.length;

    }


    /**
     * This method will write the contents of the specyfied String to the 
     * output stream, without filtering. This method is meant to be used to 
     * write the response header.
     * 
     * @param s data to be written
     */
    protected void write(String s) {

        if (s == null)
            return;

        // From the Tomcat 3.3 HTTP/1.0 connector
        int len = s.length();
        for (int i = 0; i < len; i++) {
            char c = s.charAt (i);
            // Note:  This is clearly incorrect for many strings,
            // but is the only consistent approach within the current
            // servlet framework.  It must suffice until servlet output
            // streams properly encode their output.
            if ((c <= 31) && (c != 9)) {
                c = ' ';
            } else if (c == 127) {
                c = ' ';
            }
            buf[pos++] = (byte) c;
        }

    }


    /**
     * This method will print the specified integer to the output stream, 
     * without filtering. This method is meant to be used to write the 
     * response header.
     * 
     * @param i data to be written
     */
    protected void write(int i) {

        write(String.valueOf(i));

    }


    /**
     * Callback to write data from the buffer.
     */
    protected void flushBuffer()
        throws IOException {

        //prevent timeout for async,
        SelectionKey key = socket.keyFor(selector);
        if (key != null) {
            NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
            attach.access();
        }

        //write to the socket, if there is anything to write
        if (bbuf.position() > 0) {
            writeToSocket(bbuf,true);
        }
    }


    // ----------------------------------- OutputStreamOutputBuffer Inner Class


    /**
     * This class is an output buffer which will write data to an output
     * stream.
     */
    protected class SocketOutputBuffer 
        implements OutputBuffer {


        /**
         * Write chunk.
         */
        public int doWrite(ByteChunk chunk, Response res) 
            throws IOException {

            int len = chunk.getLength();
            int start = chunk.getStart();
            byte[] b = chunk.getBuffer();
            while (len > 0) {
                int thisTime = len;
                if (bbuf.position() == bbuf.capacity()) {
                    flushBuffer();
                }
                if (thisTime > bbuf.capacity() - bbuf.position()) {
                    thisTime = bbuf.capacity() - bbuf.position();
                }
                addToBB(b,start,thisTime);
                len = len - thisTime;
                start = start + thisTime;
            }
            return chunk.getLength();

        }


    }


}
File
InternalNioOutputBuffer.java
Developer's decision
Version 2
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
 *  Copyright 2005-2006 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.tomcat.util.net;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.Executor;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tomcat.jni.Error;
import org.apache.tomcat.jni.Library;
import org.apache.tomcat.jni.Poll;
import org.apache.tomcat.jni.SSL;
import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.res.StringManager;

/**
 * NIO tailored thread pool, providing the following services:
 * 
    *
  • Socket acceptor thread
  • *
  • Socket poller thread
  • *
  • Worker threads pool
  • *
* * When switching to Java 5, there's an opportunity to use the virtual * machine's thread pool. * * @author Mladen Turk * @author Remy Maucherat * @author Filip Hanik */ public class NioEndpoint { /** /** /** /** // -------------------------------------------------------------- Constants protected static Log log = LogFactory.getLog(NioEndpoint.class); protected static StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); /** * The Request attribute key for the cipher suite. */ public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; /** * The Request attribute key for the key size. */ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; /** * The Request attribute key for the client certificate chain. */ public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; /** * The Request attribute key for the session id. * This one is a Tomcat extension to the Servlet spec. */ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; // ----------------------------------------------------------------- Fields /** * Available workers. */ protected WorkerStack workers = null; /** * Running state of the endpoint. */ protected volatile boolean running = false; /** * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false; /** * Track the initialization state of the endpoint. */ protected boolean initialized = false; /** * Current worker threads busy count. */ protected int curThreadsBusy = 0; /** * Current worker threads count. */ protected int curThreads = 0; /** * Sequence number used to generate thread names. */ protected int sequence = 0; /** * Root APR memory pool. */ protected long rootPool = 0; /** * Server socket "pointer". */ protected ServerSocketChannel serverSock = null; /** * APR memory pool for the server socket. */ protected long serverSockPool = 0; /** * SSL context. */ protected long sslContext = 0; // ------------------------------------------------------------- Properties /** * External Executor based thread pool. */ protected Executor executor = null; public void setExecutor(Executor executor) { this.executor = executor; } public Executor getExecutor() { return executor; } /** * Maximum amount of worker threads. */ protected int maxThreads = 40; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } /** public int getMaxThreads() { return maxThreads; } /** * Priority of the acceptor and poller threads. */ protected int threadPriority = Thread.NORM_PRIORITY; public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } public int getThreadPriority() { return threadPriority; } /** * Size of the socket poller. */ protected int pollerSize = 8 * 1024; public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } public int getPollerSize() { return pollerSize; } /** * Server socket port. */ protected int port; public int getPort() { return port; } public void setPort(int port ) { this.port=port; } /** * Address for the server socket. */ protected InetAddress address; public InetAddress getAddress() { return address; } public void setAddress(InetAddress address) { this.address = address; } /** * Handling of accepted sockets. */ protected Handler handler = null; public void setHandler(Handler handler ) { this.handler = handler; } public Handler getHandler() { return handler; } /** * Allows the server developer to specify the backlog that * should be used for server sockets. By default, this value * is 100. */ protected int backlog = 100; public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } public int getBacklog() { return backlog; } /** * Socket TCP no delay. */ protected boolean tcpNoDelay = false; public boolean getTcpNoDelay() { return tcpNoDelay; } public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } /** * Socket linger. */ protected int soLinger = 100; public int getSoLinger() { return soLinger; } public void setSoLinger(int soLinger) { this.soLinger = soLinger; } /** * Socket timeout. */ protected int soTimeout = -1; public int getSoTimeout() { return soTimeout; } public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } /** * Timeout on first request read before going to the poller, in ms. */ protected int firstReadTimeout = 60000; public int getFirstReadTimeout() { return firstReadTimeout; } public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } /** * Poll interval, in microseconds. The smaller the value, the more CPU the poller * will use, but the more responsive to activity it will be. */ protected int pollTime = 2000; public int getPollTime() { return pollTime; } public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } /** * The default is true - the created threads will be * in daemon mode. If set to false, the control thread * will not be daemon - and will keep the process alive. */ protected boolean daemon = true; public void setDaemon(boolean b) { daemon = b; } public boolean getDaemon() { return daemon; } return poller; } public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } * Name of the thread pool, which will be used for naming child threads. */ protected String name = "TP"; public void setName(String name) { this.name = name; } public String getName() { return name; } /** * Allow comet request handling. */ protected boolean useComet = true; public void setUseComet(boolean useComet) { this.useComet = useComet; } public boolean getUseComet() { return useComet; } /** * Acceptor thread count. */ protected int acceptorThreadCount = 0; public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } public int getAcceptorThreadCount() { return acceptorThreadCount; } /** * Poller thread count. */ protected int pollerThreadCount = 0; public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } public int getPollerThreadCount() { return pollerThreadCount; } protected long selectorTimeout = 5000; public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} public long getSelectorTimeout(){ return this.selectorTimeout; } /** * The socket poller. */ protected Poller[] pollers = null; protected int pollerRoundRobin = 0; public Poller getPoller() { pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; Poller poller = pollers[pollerRoundRobin]; return poller; } /** * The socket poller used for Comet support. */ public Poller getCometPoller() { Poller poller = getPoller(); /** * Dummy maxSpareThreads property. */ public int getMaxSpareThreads() { return 0; } /** * Dummy minSpareThreads property. */ public int getMinSpareThreads() { return 0; } /** * SSL engine. */ protected String SSLEngine = "off"; public String getSSLEngine() { return SSLEngine; } public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; } /** * SSL protocols. */ protected String SSLProtocol = "all"; public String getSSLProtocol() { return SSLProtocol; } public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } /** * SSL password (if a cert is encrypted, and no password has been provided, a callback * will ask for a password). */ protected String SSLPassword = null; public String getSSLPassword() { return SSLPassword; } public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } /** * SSL cipher suite. */ protected String SSLCipherSuite = "ALL"; public String getSSLCipherSuite() { return SSLCipherSuite; } public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } /** * SSL certificate file. */ protected String SSLCertificateFile = null; public String getSSLCertificateFile() { return SSLCertificateFile; } */ * SSL certificate key file. */ protected String SSLCertificateKeyFile = null; public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } /** * SSL certificate chain file. */ protected String SSLCertificateChainFile = null; public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } /** * SSL CA certificate path. */ protected String SSLCACertificatePath = null; public String getSSLCACertificatePath() { return SSLCACertificatePath; } public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } /** * SSL CA certificate file. */ protected String SSLCACertificateFile = null; public String getSSLCACertificateFile() { return SSLCACertificateFile; } public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } /** * SSL CA revocation path. */ protected String SSLCARevocationPath = null; public String getSSLCARevocationPath() { return SSLCARevocationPath; } public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } /** * SSL CA revocation file. */ protected String SSLCARevocationFile = null; public String getSSLCARevocationFile() { return SSLCARevocationFile; } public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } /** * SSL verify client. protected String SSLVerifyClient = "none"; public String getSSLVerifyClient() { return SSLVerifyClient; } public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } /** * SSL verify depth. */ protected int SSLVerifyDepth = 10; public int getSSLVerifyDepth() { return SSLVerifyDepth; } public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } // --------------------------------------------------------- Public Methods /** * Number of keepalive sockets. */ public int getKeepAliveCount() { if (pollers == null) { return 0; } else { int keepAliveCount = 0; for (int i = 0; i < pollers.length; i++) { keepAliveCount += pollers[i].getKeepAliveCount(); } return keepAliveCount; } } /** * Return the amount of threads that are managed by the pool. * * @return the amount of threads that are managed by the pool */ public int getCurrentThreadCount() { return curThreads; } /** * Return the amount of threads currently busy. * * @return the amount of threads currently busy */ public int getCurrentThreadsBusy() { return curThreadsBusy; } } * Return the state of the endpoint. * * @return true if the endpoint is running, false otherwise */ public boolean isRunning() { return running; } /** * Return the state of the endpoint. * * @return true if the endpoint is paused, false otherwise */ public boolean isPaused() { return paused; } // ----------------------------------------------- Public Lifecycle Methods /** * Initialize the endpoint. */ public void init() throws Exception { if (initialized) return; serverSock = ServerSocketChannel.open(); InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); serverSock.socket().bind(addr,100); //todo, set backlog value serverSock.configureBlocking(true); //mimic APR behavior // Initialize thread count defaults for acceptor, poller and sendfile if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount != 1) { // limit to one poller, no need for others pollerThreadCount = 1; } // Initialize SSL if needed if (!"off".equalsIgnoreCase(SSLEngine)) { // Initialize SSL // FIXME: one per VM call ? if ("on".equalsIgnoreCase(SSLEngine)) { SSL.initialize(null); } else { SSL.initialize(SSLEngine); // SSL protocol int value = SSL.SSL_PROTOCOL_ALL; if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV2; } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV3; } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_TLSV1; } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; } // // Create SSL Context // sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); // // List the ciphers that the client is permitted to negotiate // SSLContext.setCipherSuite(sslContext, SSLCipherSuite); // // Load Server key and certificate // SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); // // Set certificate chain file // SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); // // Support Client Certificates // SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); // // Set revocation // SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); // // Client certificate verification // value = SSL.SSL_CVERIFY_NONE; // if ("optional".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_OPTIONAL; // } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_REQUIRE; // } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; // } // SSLContext.setVerify(sslContext, value, SSLVerifyDepth); } initialized = true; } if (address == null) { * Start the APR endpoint, creating acceptor, poller threads. */ public void start() throws Exception { // Initialize socket if not done before if (!initialized) { init(); } if (!running) { running = true; paused = false; // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } // Start poller threads pollers = new Poller[pollerThreadCount]; for (int i = 0; i < pollerThreadCount; i++) { pollers[i] = new Poller(); pollers[i].init(); Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } } } /** * Pause the endpoint, which will make it stop accepting new sockets. */ public void pause() { if (running && !paused) { paused = true; unlockAccept(); } } /** * Resume the endpoint, which will make it start accepting new sockets * again. */ public void resume() { if (running) { paused = false; } } /** * Stop the endpoint. This will cause all processing threads to stop. */ public void stop() { if (running) { running = false; unlockAccept(); for (int i = 0; i < pollers.length; i++) { pollers[i].destroy(); } pollers = null; } } /** * Deallocate APR memory pools, and close server socket. */ public void destroy() throws Exception { if (running) { stop(); } // Close server socket serverSock.socket().close(); serverSock.close(); serverSock = null; sslContext = 0; initialized = false; } // ------------------------------------------------------ Protected Methods /** * Get a sequence number used for thread naming. */ protected int getSequence() { return sequence++; } /** * Unlock the server socket accept using a bugus connection. */ protected void unlockAccept() { java.net.Socket s = null; try { // Need to create a connection to unlock the accept(); // return false; // } */ s = new java.net.Socket("127.0.0.1", port); } else { s = new java.net.Socket(address, port); // setting soLinger to a small value will help shutdown the // connection quicker s.setSoLinger(true, 0); } } catch(Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); } } finally { if (s != null) { try { s.close(); } catch (Exception e) { // Ignore } } } } /** * Process the specified connection. */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection int step = 1; try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); // 1: Set socket options: timeout, linger, etc if (soLinger >= 0) socket.socket().setSoLinger(true,soLinger); if (tcpNoDelay) socket.socket().setTcpNoDelay(true); if (soTimeout > 0) socket.socket().setSoTimeout(soTimeout); // 2: SSL handshake step = 2; if (sslContext != 0) { // SSLSocket.attach(sslContext, socket); // if (SSLSocket.handshake(socket) != 0) { // if (log.isDebugEnabled()) { // log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); // } } getPoller().register(socket); } catch (Throwable t) { if (log.isDebugEnabled()) { if (step == 2) { log.debug(sm.getString("endpoint.err.handshake"), t); } else { log.debug(sm.getString("endpoint.err.unexpected"), t); } } // Tell to close the socket return false; } return true; } /** * Create (or allocate) and return an available processor for use in * processing a specific HTTP request, if possible. If the maximum * allowed processors have already been created and are in use, return * null instead. */ protected Worker createWorkerThread() { synchronized (workers) { if (workers.size() > 0) { curThreadsBusy++; return (workers.pop()); } if ((maxThreads > 0) && (curThreads < maxThreads)) { curThreadsBusy++; return (newWorkerThread()); } else { if (maxThreads < 0) { curThreadsBusy++; return (newWorkerThread()); } else { return (null); } } } } /** * Create and return a new processor suitable for processing HTTP * requests and returning the corresponding responses. protected class Acceptor implements Runnable { protected Worker newWorkerThread() { Worker workerThread = new Worker(); workerThread.start(); return (workerThread); } /** * Return a new worker thread, and block while to worker is available. */ protected Worker getWorkerThread() { // Allocate a new worker thread Worker workerThread = createWorkerThread(); while (workerThread == null) { try { synchronized (workers) { workers.wait(); } } catch (InterruptedException e) { // Ignore } workerThread = createWorkerThread(); } return workerThread; } /** * Recycle the specified Processor so that it can be used again. * * @param workerThread The processor to be recycled */ protected void recycleWorkerThread(Worker workerThread) { synchronized (workers) { workers.push(workerThread); curThreadsBusy--; workers.notify(); } } /** * Allocate a new poller of the specified size. */ protected long allocatePoller(int size, long pool, int timeout) { try { return Poll.create(size, pool, 0, timeout * 1000); } catch (Error e) { if (Status.APR_STATUS_IS_EINVAL(e.getError())) { log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size)); return 0; } else { log.error(sm.getString("endpoint.poll.initfail"), e); return -1; } } } /** * Process given socket. */ protected boolean processSocket(SocketChannel socket) { try { if (executor == null) { getWorkerThread().assign(socket); } else { executor.execute(new SocketProcessor(socket)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; } /** * Process given socket for an event. */ protected boolean processSocket(SocketChannel socket, boolean error) { try { if (executor == null) { getWorkerThread().assign(socket, error); } else { executor.execute(new SocketEventProcessor(socket, error)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; } // --------------------------------------------------- Acceptor Inner Class /** * Server socket acceptor thread. */ * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } try { // Accept the next incoming connection from the server socket SocketChannel socket = serverSock.accept(); // Hand this socket off to an appropriate processor if(!setSocketOptions(socket)) { // Close socket right away socket.socket().close(); socket.close(); } } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } // The processor will recycle itself when it finishes } } } // ----------------------------------------------------- Poller Inner Class /** * Poller class. */ public class Poller implements Runnable { protected Selector selector; protected LinkedList events = new LinkedList(); protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling protected int keepAliveCount = 0; public int getKeepAliveCount() { return keepAliveCount; } public Poller() throws IOException { this.selector = Selector.open(); } public Selector getSelector() { return selector;} /** * Create the poller. With some versions of APR, the maximum poller size will * be 62 (reocmpiling APR is necessary to remove this limitation). */ protected void init() { keepAliveCount = 0; } /** * Destroy the poller. */ protected void destroy() { // Wait for polltime before doing anything, so that the poller threads // exit, otherwise parallel descturction of sockets which are still // in the poller can cause problems try { synchronized (this) { this.wait(pollTime / 1000); } } catch (InterruptedException e) { // Ignore } close = true; } public void addEvent(Runnable event) { synchronized (events) { events.add(event); } selector.wakeup(); } /** * Add specified socket and associated pool to the poller. The socket will * be added to a temporary array, and polled first after a maximum amount * of time equal to pollTime (in most cases, latency will be much lower, * however). * } catch (Exception x) { * @param socket to add to the poller */ public void add(final SocketChannel socket) { final SelectionKey key = socket.keyFor(selector); KeyAttachment att = (KeyAttachment)key.attachment(); if ( att != null ) att.setWakeUp(false); Runnable r = new Runnable() { public void run() { try { if (key != null) key.interestOps(SelectionKey.OP_READ); }catch ( CancelledKeyException ckx ) { try { if ( key != null && key.attachment() != null ) { KeyAttachment ka = (KeyAttachment)key.attachment(); ka.setError(true); //set to collect this socket immediately } socket.socket().close(); socket.close(); } catch ( Exception ignore ) {} } } }; addEvent(r); } public void events() { synchronized (events) { Runnable r = null; while ( (events.size() > 0) && (r = events.removeFirst()) != null ) { try { r.run(); } catch ( Exception x ) { log.error("",x); } } events.clear(); } } public void register(final SocketChannel socket) { SelectionKey key = socket.keyFor(selector); Runnable r = new Runnable() { public void run() { try { socket.register(selector, SelectionKey.OP_READ, new KeyAttachment()); log.error("", x); } } }; synchronized (events) { events.add(r); } selector.wakeup(); } public void cancelledKey(SelectionKey key) { try { KeyAttachment ka = (KeyAttachment) key.attachment(); key.cancel(); if (ka != null && ka.getComet()) processSocket( (SocketChannel) key.channel(), true); key.channel().close(); } catch (IOException e) { if ( log.isDebugEnabled() ) log.debug("",e); // Ignore } } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } events(); // Time to terminate? if (close) return; int keyCount = 0; try { keyCount = selector.select(selectorTimeout); } catch (Throwable x) { log.error("",x); continue; } //if (keyCount == 0) continue; Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); iterator.remove(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { if(attachment == null) attachment = new KeyAttachment(); attachment.access(); sk.attach(attachment); int readyOps = sk.readyOps(); sk.interestOps(sk.interestOps() & ~readyOps); SocketChannel channel = (SocketChannel)sk.channel(); boolean read = sk.isReadable(); if (read) { if ( attachment.getWakeUp() ) { attachment.setWakeUp(false); synchronized (attachment.getMutex()) {attachment.getMutex().notifyAll();} } else if ( attachment.getComet() ) { if (!processSocket(channel,false)) processSocket(channel,true); } else { boolean close = (!processSocket(channel)); if ( close ) { channel.socket().close(); channel.close(); } } } if (sk.isValid() && sk.isWritable()) { } } catch ( CancelledKeyException ckx ) { if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true); try { sk.channel().close(); }catch ( Exception ignore){} } catch (Throwable t) { log.error("",t); } }//while //process timeouts timeout(); } synchronized (this) { this.notifyAll(); } } protected void timeout() { long now = System.currentTimeMillis(); if ( now < nextExpiration ) return; nextExpiration = now + (long)soTimeout; //timeout Set keys = selector.keys(); for (Iterator iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); try { KeyAttachment ka = (KeyAttachment) key.attachment(); if ( ka == null ) { cancelledKey(key); //we don't support any keys without attachments } else if ( ka.getError() ) { cancelledKey(key); }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout()); boolean isTimedout = delta > timeout; if (isTimedout) { cancelledKey(key); } else { long nextTime = now+(timeout-delta); nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration; } }//end if }catch ( CancelledKeyException ckx ) { cancelledKey(key); } }//for } } public static class KeyAttachment { public long getLastAccess() { return lastAccess; } public void access() { access(System.currentTimeMillis()); } public void access(long access) { lastAccess = access; } public void setComet(boolean comet) { this.comet = comet; } public boolean getComet() { return comet; } public boolean getCurrentAccess() { return currentAccess; } public void setCurrentAccess(boolean access) { currentAccess = access; } public boolean getWakeUp() { return wakeUp; } public void setWakeUp(boolean wakeUp) { this.wakeUp = wakeUp; } public Object getMutex() {return mutex;} public void setTimeout(long timeout) {this.timeout = timeout;} public long getTimeout() {return this.timeout;} public boolean getError() { return error; } public void setError(boolean error) { this.error = error; } protected Object mutex = new Object(); protected boolean wakeUp = false; protected long lastAccess = System.currentTimeMillis(); protected boolean currentAccess = false; protected boolean comet = false; protected long timeout = -1; protected boolean error = false; } // ----------------------------------------------------- Worker Inner Class /** * Server processor class. */ protected class Worker implements Runnable { protected Thread thread = null; protected boolean available = false; protected SocketChannel socket = null; protected boolean event = false; protected boolean error = false; /** * Process an incoming TCP/IP connection on the specified socket. Any * exception that occurs during processing must be logged and swallowed. * NOTE: This method is called from our Connector's thread. We * must assign it to our own thread so that multiple simultaneous * requests can be handled. * * @param socket TCP socket to process */ protected synchronized void assign(SocketChannel socket) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; event = false; error = false; available = true; notifyAll(); } protected synchronized void assign(SocketChannel socket, boolean error) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; event = true; this.error = error; available = true; notifyAll(); } /** * Await a newly assigned Socket from our Connector, or null * if we are supposed to shut down. */ protected synchronized SocketChannel await() { * @param object the object to be appended to the queue (first element). // Wait for the Connector to provide a new Socket while (!available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket SocketChannel socket = this.socket; available = false; notifyAll(); return (socket); } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal while (running) { // Wait for the next socket to be assigned SocketChannel socket = await(); if (socket == null) continue; // Process the request from this socket if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { // Close socket and pool try { socket.socket().close(); socket.close(); }catch ( Exception x ) { log.error("",x); } } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { // Close socket and pool try { socket.socket().close(); socket.close(); }catch ( Exception x ) { log.error("",x); } } // Finish up this request recycleWorkerThread(this); } } /** * Start the background processing thread. */ public void start() { thread = new Thread(this); thread.setName(getName() + "-" + (++curThreads)); thread.setDaemon(true); thread.start(); } } // ------------------------------------------------ Handler Inner Interface /** * Bare bones interface used for socket processing. Per thread data is to be * stored in the ThreadWithAttributes extra folders, or alternately in * thread local fields. */ public interface Handler { public enum SocketState { OPEN, CLOSED, LONG } public SocketState process(SocketChannel socket); public SocketState event(SocketChannel socket, boolean error); } // ------------------------------------------------- WorkerStack Inner Class public class WorkerStack { protected Worker[] workers = null; protected int end = 0; public WorkerStack(int size) { workers = new Worker[size]; } /** * Put the object into the queue. * // Process the request from this socket */ public void push(Worker worker) { workers[end++] = worker; } /** * Get the first object out of the queue. Return null if the queue * is empty. */ public Worker pop() { if (end > 0) { return workers[--end]; } return null; } /** * Get the first object out of the queue, Return null if the queue * is empty. */ public Worker peek() { return workers[end]; } /** * Is the queue empty? */ public boolean isEmpty() { return (end == 0); } /** * How many elements are there in this queue? */ public int size() { return (end); } } // ---------------------------------------------- SocketProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { protected SocketChannel socket = null; public SocketProcessor(SocketChannel socket) { this.socket = socket; } public void run() { if (handler.process(socket) == Handler.SocketState.CLOSED) { // Close socket and pool try { socket.socket().close(); socket.close(); } catch ( Exception x ) { log.error("",x); } socket = null; } } } // --------------------------------------- SocketEventProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketEventProcessor implements Runnable { protected SocketChannel socket = null; protected boolean error = false; public SocketEventProcessor(SocketChannel socket, boolean error) { this.socket = socket; this.error = error; } public void run() { // Process the request from this socket if (handler.event(socket, error) == Handler.SocketState.CLOSED) { // Close socket and pool try { socket.socket().close(); socket.close(); } catch ( Exception x ) { log.error("",x); } socket = null; } } } } ======= /** /** /* * Copyright 2005-2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.tomcat.util.net; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Set; import java.util.concurrent.Executor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tomcat.jni.Error; import org.apache.tomcat.jni.Library; import org.apache.tomcat.jni.Poll; import org.apache.tomcat.jni.SSL; import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.res.StringManager; /** * NIO tailored thread pool, providing the following services: *
    *
  • Socket acceptor thread
  • *
  • Socket poller thread
  • *
  • Worker threads pool
  • *
* * When switching to Java 5, there's an opportunity to use the virtual * machine's thread pool. * * @author Mladen Turk * @author Remy Maucherat * @author Filip Hanik */ public class NioEndpoint { // -------------------------------------------------------------- Constants protected static Log log = LogFactory.getLog(NioEndpoint.class); protected static StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); /** * The Request attribute key for the cipher suite. */ public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; /** * The Request attribute key for the key size. */ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; /** * The Request attribute key for the client certificate chain. */ public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; /** * The Request attribute key for the session id. * This one is a Tomcat extension to the Servlet spec. */ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; // ----------------------------------------------------------------- Fields /** * Available workers. */ protected WorkerStack workers = null; /** * Running state of the endpoint. */ protected volatile boolean running = false; public Executor getExecutor() { return executor; } public void setExecutor(Executor executor) { this.executor = executor; } * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false; /** * Track the initialization state of the endpoint. */ protected boolean initialized = false; /** * Current worker threads busy count. */ protected int curThreadsBusy = 0; /** * Current worker threads count. */ protected int curThreads = 0; /** * Sequence number used to generate thread names. */ protected int sequence = 0; /** * Root APR memory pool. */ protected long rootPool = 0; /** * Server socket "pointer". */ protected ServerSocketChannel serverSock = null; /** * APR memory pool for the server socket. */ protected long serverSockPool = 0; /** * SSL context. */ protected long sslContext = 0; // ------------------------------------------------------------- Properties /** * External Executor based thread pool. */ protected Executor executor = null; * Maximum amount of worker threads. */ protected int maxThreads = 40; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } public int getMaxThreads() { return maxThreads; } /** * Priority of the acceptor and poller threads. */ protected int threadPriority = Thread.NORM_PRIORITY; public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } public int getThreadPriority() { return threadPriority; } /** * Size of the socket poller. */ protected int pollerSize = 8 * 1024; public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } public int getPollerSize() { return pollerSize; } /** * Server socket port. */ protected int port; public int getPort() { return port; } public void setPort(int port ) { this.port=port; } /** * Address for the server socket. */ protected InetAddress address; public InetAddress getAddress() { return address; } public void setAddress(InetAddress address) { this.address = address; } /** * Handling of accepted sockets. */ protected Handler handler = null; public void setHandler(Handler handler ) { this.handler = handler; } public Handler getHandler() { return handler; } /** * Allows the server developer to specify the backlog that Poller poller = pollers[pollerRoundRobin]; return poller; } * should be used for server sockets. By default, this value * is 100. */ protected int backlog = 100; public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } public int getBacklog() { return backlog; } /** * Socket TCP no delay. */ protected boolean tcpNoDelay = false; public boolean getTcpNoDelay() { return tcpNoDelay; } public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } /** * Socket linger. */ protected int soLinger = 100; public int getSoLinger() { return soLinger; } public void setSoLinger(int soLinger) { this.soLinger = soLinger; } /** * Socket timeout. */ protected int soTimeout = -1; public int getSoTimeout() { return soTimeout; } public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } /** * Timeout on first request read before going to the poller, in ms. */ protected int firstReadTimeout = 60000; public int getFirstReadTimeout() { return firstReadTimeout; } public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } /** * Poll interval, in microseconds. The smaller the value, the more CPU the poller * will use, but the more responsive to activity it will be. */ protected int pollTime = 2000; public int getPollTime() { return pollTime; } public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } /** * The default is true - the created threads will be * in daemon mode. If set to false, the control thread * will not be daemon - and will keep the process alive. */ protected boolean daemon = true; public void setDaemon(boolean b) { daemon = b; } public boolean getDaemon() { return daemon; } /** * Name of the thread pool, which will be used for naming child threads. */ protected String name = "TP"; public void setName(String name) { this.name = name; } public String getName() { return name; } /** * Allow comet request handling. */ protected boolean useComet = true; public void setUseComet(boolean useComet) { this.useComet = useComet; } public boolean getUseComet() { return useComet; } /** * Acceptor thread count. */ protected int acceptorThreadCount = 0; public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } public int getAcceptorThreadCount() { return acceptorThreadCount; } /** * Poller thread count. */ protected int pollerThreadCount = 0; public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } public int getPollerThreadCount() { return pollerThreadCount; } protected long selectorTimeout = 1000; public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} public long getSelectorTimeout(){ return this.selectorTimeout; } /** * The socket poller. */ protected Poller[] pollers = null; protected int pollerRoundRobin = 0; public Poller getPoller() { pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; } /** * The socket poller used for Comet support. protected String SSLCARevocationFile = null; /** */ public Poller getCometPoller() { Poller poller = getPoller(); return poller; } /** * Dummy maxSpareThreads property. */ public int getMaxSpareThreads() { return 0; } /** * Dummy minSpareThreads property. */ public int getMinSpareThreads() { return 0; } /** * SSL engine. */ protected String SSLEngine = "off"; public String getSSLEngine() { return SSLEngine; } public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; } /** * SSL protocols. */ protected String SSLProtocol = "all"; public String getSSLProtocol() { return SSLProtocol; } public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } /** * SSL password (if a cert is encrypted, and no password has been provided, a callback * will ask for a password). */ protected String SSLPassword = null; public String getSSLPassword() { return SSLPassword; } public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } /** * SSL cipher suite. */ protected String SSLCipherSuite = "ALL"; public String getSSLCipherSuite() { return SSLCipherSuite; } public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } /** * SSL certificate file. */ protected String SSLCertificateFile = null; public String getSSLCertificateFile() { return SSLCertificateFile; } public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } /** * SSL certificate key file. */ protected String SSLCertificateKeyFile = null; public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } /** * SSL certificate chain file. */ protected String SSLCertificateChainFile = null; public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } /** * SSL CA certificate path. */ protected String SSLCACertificatePath = null; public String getSSLCACertificatePath() { return SSLCACertificatePath; } public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } /** * SSL CA certificate file. */ protected String SSLCACertificateFile = null; public String getSSLCACertificateFile() { return SSLCACertificateFile; } public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } /** * SSL CA revocation path. */ protected String SSLCARevocationPath = null; public String getSSLCARevocationPath() { return SSLCARevocationPath; } public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } /** * SSL CA revocation file. */ public String getSSLCARevocationFile() { return SSLCARevocationFile; } public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } /** * SSL verify client. */ protected String SSLVerifyClient = "none"; public String getSSLVerifyClient() { return SSLVerifyClient; } public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } /** * SSL verify depth. */ protected int SSLVerifyDepth = 10; public int getSSLVerifyDepth() { return SSLVerifyDepth; } public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } // --------------------------------------------------------- Public Methods /** * Number of keepalive sockets. */ public int getKeepAliveCount() { if (pollers == null) { return 0; } else { int keepAliveCount = 0; for (int i = 0; i < pollers.length; i++) { keepAliveCount += pollers[i].getKeepAliveCount(); } return keepAliveCount; } } /** * Return the amount of threads that are managed by the pool. * * @return the amount of threads that are managed by the pool */ public int getCurrentThreadCount() { return curThreads; } /** * Return the amount of threads currently busy. * * @return the amount of threads currently busy */ public int getCurrentThreadsBusy() { return curThreadsBusy; } /** * Return the state of the endpoint. * * @return true if the endpoint is running, false otherwise */ public boolean isRunning() { return running; } /** * Return the state of the endpoint. * * @return true if the endpoint is paused, false otherwise */ public boolean isPaused() { return paused; } // ----------------------------------------------- Public Lifecycle Methods /** * Initialize the endpoint. */ public void init() throws Exception { if (initialized) return; serverSock = ServerSocketChannel.open(); InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); serverSock.socket().bind(addr,100); //todo, set backlog value serverSock.configureBlocking(true); //mimic APR behavior // Initialize thread count defaults for acceptor, poller and sendfile if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount != 1) { // limit to one poller, no need for others pollerThreadCount = 1; } // Initialize SSL if needed * Poller class. // SSLContext.setVerify(sslContext, value, SSLVerifyDepth); */ if (running && !paused) { if (!"off".equalsIgnoreCase(SSLEngine)) { // Initialize SSL // FIXME: one per VM call ? if ("on".equalsIgnoreCase(SSLEngine)) { SSL.initialize(null); } else { SSL.initialize(SSLEngine); } // SSL protocol int value = SSL.SSL_PROTOCOL_ALL; if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV2; } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV3; } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_TLSV1; } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; } // // Create SSL Context // sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); // // List the ciphers that the client is permitted to negotiate // SSLContext.setCipherSuite(sslContext, SSLCipherSuite); // // Load Server key and certificate // SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); // // Set certificate chain file // SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); // // Support Client Certificates // SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); // // Set revocation // SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); // // Client certificate verification // value = SSL.SSL_CVERIFY_NONE; // if ("optional".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_OPTIONAL; // } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_REQUIRE; // } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; // } initialized = true; } /** * Start the APR endpoint, creating acceptor, poller threads. */ public void start() throws Exception { // Initialize socket if not done before if (!initialized) { init(); } if (!running) { running = true; paused = false; // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } // Start poller threads pollers = new Poller[pollerThreadCount]; for (int i = 0; i < pollerThreadCount; i++) { pollers[i] = new Poller(); pollers[i].init(); Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } } } /** * Pause the endpoint, which will make it stop accepting new sockets. */ public void pause() { } if (sslContext != 0) { paused = true; unlockAccept(); } } /** * Resume the endpoint, which will make it start accepting new sockets * again. */ public void resume() { if (running) { paused = false; } } /** * Stop the endpoint. This will cause all processing threads to stop. */ public void stop() { if (running) { running = false; unlockAccept(); for (int i = 0; i < pollers.length; i++) { pollers[i].destroy(); } pollers = null; } } /** * Deallocate APR memory pools, and close server socket. */ public void destroy() throws Exception { if (running) { stop(); } // Close server socket serverSock.socket().close(); serverSock.close(); serverSock = null; sslContext = 0; initialized = false; } // ------------------------------------------------------ Protected Methods /** * Get a sequence number used for thread naming. */ protected int getSequence() { return sequence++; } /** * Unlock the server socket accept using a bugus connection. */ protected void unlockAccept() { java.net.Socket s = null; try { // Need to create a connection to unlock the accept(); if (address == null) { s = new java.net.Socket("127.0.0.1", port); } else { s = new java.net.Socket(address, port); // setting soLinger to a small value will help shutdown the // connection quicker s.setSoLinger(true, 0); } } catch(Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); } } finally { if (s != null) { try { s.close(); } catch (Exception e) { // Ignore } } } } /** * Process the specified connection. */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection int step = 1; try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); // 1: Set socket options: timeout, linger, etc if (soLinger >= 0) socket.socket().setSoLinger(true,soLinger); if (tcpNoDelay) socket.socket().setTcpNoDelay(true); if (soTimeout > 0) socket.socket().setSoTimeout(soTimeout); // 2: SSL handshake step = 2; try { } return Poll.create(size, pool, 0, timeout * 1000); // SSLSocket.attach(sslContext, socket); // if (SSLSocket.handshake(socket) != 0) { // if (log.isDebugEnabled()) { // log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); // } // return false; // } } getPoller().register(socket); } catch (Throwable t) { if (log.isDebugEnabled()) { if (step == 2) { log.debug(sm.getString("endpoint.err.handshake"), t); } else { log.debug(sm.getString("endpoint.err.unexpected"), t); } } // Tell to close the socket return false; } return true; } /** * Create (or allocate) and return an available processor for use in * processing a specific HTTP request, if possible. If the maximum * allowed processors have already been created and are in use, return * null instead. */ protected Worker createWorkerThread() { synchronized (workers) { if (workers.size() > 0) { curThreadsBusy++; return (workers.pop()); } if ((maxThreads > 0) && (curThreads < maxThreads)) { curThreadsBusy++; return (newWorkerThread()); } else { if (maxThreads < 0) { curThreadsBusy++; return (newWorkerThread()); } else { return (null); } } /** * Create and return a new processor suitable for processing HTTP * requests and returning the corresponding responses. */ protected Worker newWorkerThread() { Worker workerThread = new Worker(); workerThread.start(); return (workerThread); } /** * Return a new worker thread, and block while to worker is available. */ protected Worker getWorkerThread() { // Allocate a new worker thread Worker workerThread = createWorkerThread(); while (workerThread == null) { try { synchronized (workers) { workers.wait(); } } catch (InterruptedException e) { // Ignore } workerThread = createWorkerThread(); } return workerThread; } /** * Recycle the specified Processor so that it can be used again. * * @param workerThread The processor to be recycled */ protected void recycleWorkerThread(Worker workerThread) { synchronized (workers) { workers.push(workerThread); curThreadsBusy--; workers.notify(); } } /** * Allocate a new poller of the specified size. */ protected long allocatePoller(int size, long pool, int timeout) { } catch (Error e) { if (Status.APR_STATUS_IS_EINVAL(e.getError())) { log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size)); return 0; } else { log.error(sm.getString("endpoint.poll.initfail"), e); return -1; } } } /** * Process given socket. */ protected boolean processSocket(SocketChannel socket) { try { if (executor == null) { getWorkerThread().assign(socket); } else { executor.execute(new SocketProcessor(socket)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; } /** * Process given socket for an event. */ protected boolean processSocket(SocketChannel socket, boolean error) { try { if (executor == null) { getWorkerThread().assign(socket, error); } else { executor.execute(new SocketEventProcessor(socket, error)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; } // --------------------------------------------------- Acceptor Inner Class /** * Server socket acceptor thread. */ protected class Acceptor implements Runnable { /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } try { // Accept the next incoming connection from the server socket SocketChannel socket = serverSock.accept(); // Hand this socket off to an appropriate processor if(!setSocketOptions(socket)) { // Close socket right away socket.socket().close(); socket.close(); } } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } // The processor will recycle itself when it finishes } } } // ----------------------------------------------------- Poller Inner Class public class Poller implements Runnable { protected Selector selector; protected LinkedList events = new LinkedList(); protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling protected int keepAliveCount = 0; public int getKeepAliveCount() { return keepAliveCount; } public Poller() throws IOException { this.selector = Selector.open(); } public Selector getSelector() { return selector;} /** * Create the poller. With some versions of APR, the maximum poller size will * be 62 (reocmpiling APR is necessary to remove this limitation). */ protected void init() { keepAliveCount = 0; } /** * Destroy the poller. */ protected void destroy() { // Wait for polltime before doing anything, so that the poller threads // exit, otherwise parallel descturction of sockets which are still // in the poller can cause problems try { synchronized (this) { this.wait(pollTime / 1000); } } catch (InterruptedException e) { // Ignore } close = true; } public void addEvent(Runnable event) { synchronized (events) { events.add(event); } selector.wakeup(); } /** * Add specified socket and associated pool to the poller. The socket will * be added to a temporary array, and polled first after a maximum amount * of time equal to pollTime (in most cases, latency will be much lower, * however). * * @param socket to add to the poller */ public void add(final SocketChannel socket) { final SelectionKey key = socket.keyFor(selector); KeyAttachment att = (KeyAttachment)key.attachment(); if ( att != null ) att.setWakeUp(false); Runnable r = new Runnable() { public void run() { try { if (key != null) key.interestOps(SelectionKey.OP_READ); }catch ( CancelledKeyException ckx ) { try { if ( key != null && key.attachment() != null ) { KeyAttachment ka = (KeyAttachment)key.attachment(); ka.setError(true); //set to collect this socket immediately } socket.socket().close(); socket.close(); } catch ( Exception ignore ) {} } } }; addEvent(r); } public boolean events() { boolean result = false; synchronized (events) { Runnable r = null; result = (events.size() > 0); while ( (events.size() > 0) && (r = events.removeFirst()) != null ) { try { r.run(); } catch ( Exception x ) { log.error("",x); } } events.clear(); } return result; } public void register(final SocketChannel socket) { SelectionKey key = socket.keyFor(selector); Runnable r = new Runnable() { public void run() { try { socket.register(selector, SelectionKey.OP_READ, new KeyAttachment()); } catch (Exception x) { log.error("", x); } } }; synchronized (events) { events.add(r); } selector.wakeup(); } public void cancelledKey(SelectionKey key) { try { KeyAttachment ka = (KeyAttachment) key.attachment(); key.cancel(); if (ka != null && ka.getComet()) processSocket( (SocketChannel) key.channel(), true); key.channel().close(); } catch (IOException e) { if ( log.isDebugEnabled() ) log.debug("",e); // Ignore } } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; hasEvents = (hasEvents | events()); // Time to terminate? if (close) return; int keyCount = 0; try { keyCount = selector.select(selectorTimeout); } catch (Throwable x) { log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); //if (keyCount == 0) continue; Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); iterator.remove(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { if(attachment == null) attachment = new KeyAttachment(); attachment.access(); sk.attach(attachment); int readyOps = sk.readyOps(); sk.interestOps(sk.interestOps() & ~readyOps); SocketChannel channel = (SocketChannel)sk.channel(); boolean read = sk.isReadable(); if (read) { if ( attachment.getWakeUp() ) { attachment.setWakeUp(false); synchronized (attachment.getMutex()) {attachment.getMutex().notifyAll();} } else if ( attachment.getComet() ) { if (!processSocket(channel,false)) processSocket(channel,true); } else { boolean close = (!processSocket(channel)); if ( close ) { channel.socket().close(); if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { // Close socket and pool try { channel.close(); } } } if (sk.isValid() && sk.isWritable()) { } } catch ( CancelledKeyException ckx ) { if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true); try { sk.channel().close(); }catch ( Exception ignore){} } catch (Throwable t) { log.error("",t); } }//while //process timeouts timeout(keyCount,hasEvents); }//while synchronized (this) { this.notifyAll(); } } protected void timeout(int keyCount, boolean hasEvents) { long now = System.currentTimeMillis(); //don't process timeouts too frequently, but if the selector simply timed out //then we can check timeouts to avoid gaps if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return; nextExpiration = now + (long)soTimeout; //timeout Set keys = selector.keys(); for (Iterator iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); try { KeyAttachment ka = (KeyAttachment) key.attachment(); if ( ka == null ) { cancelledKey(key); //we don't support any keys without attachments } else if ( ka.getError() ) { cancelledKey(key); }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout()); } boolean isTimedout = delta > timeout; if (isTimedout) { cancelledKey(key); } else { long nextTime = now+(timeout-delta); nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration; } }//end if }catch ( CancelledKeyException ckx ) { cancelledKey(key); } }//for } } public static class KeyAttachment { public long getLastAccess() { return lastAccess; } public void access() { access(System.currentTimeMillis()); } public void access(long access) { lastAccess = access; } public void setComet(boolean comet) { this.comet = comet; } public boolean getComet() { return comet; } public boolean getCurrentAccess() { return currentAccess; } public void setCurrentAccess(boolean access) { currentAccess = access; } public boolean getWakeUp() { return wakeUp; } public void setWakeUp(boolean wakeUp) { this.wakeUp = wakeUp; } public Object getMutex() {return mutex;} public void setTimeout(long timeout) {this.timeout = timeout;} public long getTimeout() {return this.timeout;} public boolean getError() { return error; } public void setError(boolean error) { this.error = error; } protected Object mutex = new Object(); protected boolean wakeUp = false; protected long lastAccess = System.currentTimeMillis(); protected boolean currentAccess = false; protected boolean comet = false; protected long timeout = -1; protected boolean error = false; } // ----------------------------------------------------- Worker Inner Class /** * Server processor class. */ protected class Worker implements Runnable { protected Thread thread = null; protected boolean available = false; protected SocketChannel socket = null; protected boolean event = false; protected boolean error = false; /** * Process an incoming TCP/IP connection on the specified socket. Any * exception that occurs during processing must be logged and swallowed. * NOTE: This method is called from our Connector's thread. We * must assign it to our own thread so that multiple simultaneous * requests can be handled. * * @param socket TCP socket to process */ protected synchronized void assign(SocketChannel socket) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; event = false; error = false; available = true; notifyAll(); } protected synchronized void assign(SocketChannel socket, boolean error) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } // Store the newly available Socket and notify our thread this.socket = socket; event = true; this.error = error; available = true; notifyAll(); } /** * Await a newly assigned Socket from our Connector, or null * if we are supposed to shut down. */ protected synchronized SocketChannel await() { // Wait for the Connector to provide a new Socket while (!available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket SocketChannel socket = this.socket; available = false; notifyAll(); return (socket); } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal while (running) { // Wait for the next socket to be assigned SocketChannel socket = await(); if (socket == null) continue; // Process the request from this socket socket.socket().close(); // ------------------------------------------------- WorkerStack Inner Class socket.close(); }catch ( Exception x ) { log.error("",x); } } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { // Close socket and pool try { socket.socket().close(); socket.close(); }catch ( Exception x ) { log.error("",x); } } // Finish up this request recycleWorkerThread(this); } } /** * Start the background processing thread. */ public void start() { thread = new Thread(this); thread.setName(getName() + "-" + (++curThreads)); thread.setDaemon(true); thread.start(); } } // ------------------------------------------------ Handler Inner Interface /** * Bare bones interface used for socket processing. Per thread data is to be * stored in the ThreadWithAttributes extra folders, or alternately in * thread local fields. */ public interface Handler { public enum SocketState { OPEN, CLOSED, LONG } public SocketState process(SocketChannel socket); public SocketState event(SocketChannel socket, boolean error); } public class WorkerStack { protected Worker[] workers = null; protected int end = 0; public WorkerStack(int size) { workers = new Worker[size]; } /** * Put the object into the queue. * * @param object the object to be appended to the queue (first element). */ public void push(Worker worker) { workers[end++] = worker; } /** * Get the first object out of the queue. Return null if the queue * is empty. */ public Worker pop() { if (end > 0) { return workers[--end]; } return null; } /** * Get the first object out of the queue, Return null if the queue * is empty. */ public Worker peek() { return workers[end]; } /** * Is the queue empty? */ public boolean isEmpty() { return (end == 0); } /** * How many elements are there in this queue? */ public int size() { return (end); } } // ---------------------------------------------- SocketProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { protected SocketChannel socket = null; public SocketProcessor(SocketChannel socket) { this.socket = socket; } public void run() { // Process the request from this socket if (handler.process(socket) == Handler.SocketState.CLOSED) { // Close socket and pool try { socket.socket().close(); socket.close(); } catch ( Exception x ) { log.error("",x); } socket = null; } } } // --------------------------------------- SocketEventProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketEventProcessor implements Runnable { protected SocketChannel socket = null; protected boolean error = false; public SocketEventProcessor(SocketChannel socket, boolean error) { this.socket = socket; this.error = error; } public void run() { // Process the request from this socket if (handler.event(socket, error) == Handler.SocketState.CLOSED) { // Close socket and pool try { socket.socket().close(); socket.close(); } catch ( Exception x ) { log.error("",x); } socket = null; } } } } >>>>>>> c705e291c22dbbe2c8e25822fd1911a4481ef618
Solution content
 *
 * @author Mladen Turk
    /**
            }
        }

    }


    /**

    /**
     */



/*
 *  Copyright 2005-2006 The Apache Software Foundation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.tomcat.util.net;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.Executor;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tomcat.jni.Error;
import org.apache.tomcat.jni.Library;
import org.apache.tomcat.jni.Poll;
import org.apache.tomcat.jni.SSL;
import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.res.StringManager;

/**
 * NIO tailored thread pool, providing the following services:
 * 
    *
  • Socket acceptor thread
  • *
  • Socket poller thread
  • *
  • Worker threads pool
  • *
* * When switching to Java 5, there's an opportunity to use the virtual * machine's thread pool. /** */ /** */ } * Current worker threads busy count. /** * @author Remy Maucherat * @author Filip Hanik */ public class NioEndpoint { // -------------------------------------------------------------- Constants protected static Log log = LogFactory.getLog(NioEndpoint.class); protected static StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); /** * The Request attribute key for the cipher suite. */ public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; /** * The Request attribute key for the key size. */ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; /** * The Request attribute key for the client certificate chain. */ public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; /** * The Request attribute key for the session id. * This one is a Tomcat extension to the Servlet spec. */ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; // ----------------------------------------------------------------- Fields /** * Available workers. */ protected WorkerStack workers = null; /** * Running state of the endpoint. */ protected volatile boolean running = false; /** * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false; /** * Track the initialization state of the endpoint. */ protected boolean initialized = false; protected int curThreadsBusy = 0; /** * Current worker threads count. */ protected int curThreads = 0; /** * Sequence number used to generate thread names. */ protected int sequence = 0; /** * Root APR memory pool. */ protected long rootPool = 0; /** * Server socket "pointer". */ protected ServerSocketChannel serverSock = null; /** * APR memory pool for the server socket. */ protected long serverSockPool = 0; /** * SSL context. */ protected long sslContext = 0; // ------------------------------------------------------------- Properties /** * External Executor based thread pool. */ protected Executor executor = null; public void setExecutor(Executor executor) { this.executor = executor; } public Executor getExecutor() { return executor; } /** * Maximum amount of worker threads. */ protected int maxThreads = 40; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } public int getMaxThreads() { return maxThreads; } /** * Priority of the acceptor and poller threads. */ protected int threadPriority = Thread.NORM_PRIORITY; public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } public int getThreadPriority() { return threadPriority; } /** * Size of the socket poller. */ protected int pollerSize = 8 * 1024; public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } public int getPollerSize() { return pollerSize; } /** * Server socket port. */ protected int port; public int getPort() { return port; } public void setPort(int port ) { this.port=port; } /** * Address for the server socket. */ protected InetAddress address; public InetAddress getAddress() { return address; } public void setAddress(InetAddress address) { this.address = address; } /** * Handling of accepted sockets. */ protected Handler handler = null; public void setHandler(Handler handler ) { this.handler = handler; } public Handler getHandler() { return handler; } /** * Allows the server developer to specify the backlog that * should be used for server sockets. By default, this value * is 100. */ protected int backlog = 100; public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } public int getBacklog() { return backlog; } /** * Socket TCP no delay. */ protected boolean tcpNoDelay = false; public boolean getTcpNoDelay() { return tcpNoDelay; } public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } /** * Socket linger. */ protected int soLinger = 100; public int getSoLinger() { return soLinger; } public void setSoLinger(int soLinger) { this.soLinger = soLinger; } /** * Socket timeout. */ protected int soTimeout = -1; public int getSoTimeout() { return soTimeout; } public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } /** * Timeout on first request read before going to the poller, in ms. */ protected int firstReadTimeout = 60000; public int getFirstReadTimeout() { return firstReadTimeout; } public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } /** * Poll interval, in microseconds. The smaller the value, the more CPU the poller * will use, but the more responsive to activity it will be. */ protected int pollTime = 2000; public int getPollTime() { return pollTime; } public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } /** * The default is true - the created threads will be * in daemon mode. If set to false, the control thread * will not be daemon - and will keep the process alive. */ protected boolean daemon = true; public void setDaemon(boolean b) { daemon = b; } public boolean getDaemon() { return daemon; } /** * Name of the thread pool, which will be used for naming child threads. */ protected String name = "TP"; public void setName(String name) { this.name = name; } public String getName() { return name; } /** * Allow comet request handling. */ protected boolean useComet = true; public void setUseComet(boolean useComet) { this.useComet = useComet; } public boolean getUseComet() { return useComet; } /** * Acceptor thread count. */ protected int acceptorThreadCount = 0; public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } public int getAcceptorThreadCount() { return acceptorThreadCount; } /** * Poller thread count. */ protected int pollerThreadCount = 0; public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } public int getPollerThreadCount() { return pollerThreadCount; } protected long selectorTimeout = 1000; public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} public long getSelectorTimeout(){ return this.selectorTimeout; } /** * The socket poller. */ protected Poller[] pollers = null; protected int pollerRoundRobin = 0; public Poller getPoller() { pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; Poller poller = pollers[pollerRoundRobin]; return poller; } /** * The socket poller used for Comet support. */ public Poller getCometPoller() { Poller poller = getPoller(); return poller; } /** * Dummy maxSpareThreads property. */ public int getMaxSpareThreads() { return 0; } /** * Dummy minSpareThreads property. */ public int getMinSpareThreads() { return 0; } /** * SSL engine. */ protected String SSLEngine = "off"; public String getSSLEngine() { return SSLEngine; } public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; } /** * SSL protocols. */ protected String SSLProtocol = "all"; public String getSSLProtocol() { return SSLProtocol; } public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } /** * SSL password (if a cert is encrypted, and no password has been provided, a callback * will ask for a password). */ protected String SSLPassword = null; public String getSSLPassword() { return SSLPassword; } public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } /** * SSL cipher suite. */ protected String SSLCipherSuite = "ALL"; public String getSSLCipherSuite() { return SSLCipherSuite; } public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } * SSL verify client. // value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; // } * SSL certificate file. */ protected String SSLCertificateFile = null; public String getSSLCertificateFile() { return SSLCertificateFile; } public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } /** * SSL certificate key file. */ protected String SSLCertificateKeyFile = null; public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } /** * SSL certificate chain file. */ protected String SSLCertificateChainFile = null; public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } /** * SSL CA certificate path. */ protected String SSLCACertificatePath = null; public String getSSLCACertificatePath() { return SSLCACertificatePath; } public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } /** * SSL CA certificate file. */ protected String SSLCACertificateFile = null; public String getSSLCACertificateFile() { return SSLCACertificateFile; } public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } /** * SSL CA revocation path. */ protected String SSLCARevocationPath = null; public String getSSLCARevocationPath() { return SSLCARevocationPath; } public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } /** * SSL CA revocation file. */ protected String SSLCARevocationFile = null; public String getSSLCARevocationFile() { return SSLCARevocationFile; } public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } protected String SSLVerifyClient = "none"; public String getSSLVerifyClient() { return SSLVerifyClient; } public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } /** * SSL verify depth. */ protected int SSLVerifyDepth = 10; public int getSSLVerifyDepth() { return SSLVerifyDepth; } public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } // --------------------------------------------------------- Public Methods /** * Number of keepalive sockets. */ public int getKeepAliveCount() { if (pollers == null) { return 0; } else { int keepAliveCount = 0; for (int i = 0; i < pollers.length; i++) { keepAliveCount += pollers[i].getKeepAliveCount(); } return keepAliveCount; } } /** * Return the amount of threads that are managed by the pool. * * @return the amount of threads that are managed by the pool */ public int getCurrentThreadCount() { return curThreads; } /** * Return the amount of threads currently busy. * * @return the amount of threads currently busy */ public int getCurrentThreadsBusy() { return curThreadsBusy; } /** * Return the state of the endpoint. * * @return true if the endpoint is running, false otherwise */ public boolean isRunning() { return running; } /** * Return the state of the endpoint. * * @return true if the endpoint is paused, false otherwise */ public boolean isPaused() { return paused; } // ----------------------------------------------- Public Lifecycle Methods /** * Initialize the endpoint. */ public void init() throws Exception { if (initialized) return; serverSock = ServerSocketChannel.open(); InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); serverSock.socket().bind(addr,100); //todo, set backlog value serverSock.configureBlocking(true); //mimic APR behavior // Initialize thread count defaults for acceptor, poller and sendfile if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount != 1) { // limit to one poller, no need for others pollerThreadCount = 1; } // Initialize SSL if needed if (!"off".equalsIgnoreCase(SSLEngine)) { // Initialize SSL // FIXME: one per VM call ? if ("on".equalsIgnoreCase(SSLEngine)) { SSL.initialize(null); } else { SSL.initialize(SSLEngine); } // SSL protocol int value = SSL.SSL_PROTOCOL_ALL; if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV2; } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV3; } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_TLSV1; } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; } // // Create SSL Context // sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); // // List the ciphers that the client is permitted to negotiate // SSLContext.setCipherSuite(sslContext, SSLCipherSuite); // // Load Server key and certificate // SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); // // Set certificate chain file // SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); // // Support Client Certificates // SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); // // Set revocation // SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); // // Client certificate verification // value = SSL.SSL_CVERIFY_NONE; // if ("optional".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_OPTIONAL; // } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { // value = SSL.SSL_CVERIFY_REQUIRE; // } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { // SSLContext.setVerify(sslContext, value, SSLVerifyDepth); } initialized = true; } /** * Start the APR endpoint, creating acceptor, poller threads. */ public void start() throws Exception { // Initialize socket if not done before if (!initialized) { init(); } if (!running) { running = true; paused = false; // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } // Start poller threads pollers = new Poller[pollerThreadCount]; for (int i = 0; i < pollerThreadCount; i++) { pollers[i] = new Poller(); pollers[i].init(); Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } } } /** * Pause the endpoint, which will make it stop accepting new sockets. */ public void pause() { if (running && !paused) { paused = true; unlockAccept(); } } /** * Resume the endpoint, which will make it start accepting new sockets * again. */ public void resume() { if (running) { paused = false; } /** * Stop the endpoint. This will cause all processing threads to stop. */ public void stop() { if (running) { running = false; unlockAccept(); for (int i = 0; i < pollers.length; i++) { pollers[i].destroy(); } pollers = null; } } /** * Deallocate APR memory pools, and close server socket. */ public void destroy() throws Exception { if (running) { stop(); } // Close server socket serverSock.socket().close(); serverSock.close(); serverSock = null; sslContext = 0; initialized = false; } // ------------------------------------------------------ Protected Methods /** * Get a sequence number used for thread naming. */ protected int getSequence() { return sequence++; } * Server socket acceptor thread. * Unlock the server socket accept using a bugus connection. */ protected void unlockAccept() { java.net.Socket s = null; try { // Need to create a connection to unlock the accept(); if (address == null) { s = new java.net.Socket("127.0.0.1", port); } else { s = new java.net.Socket(address, port); // setting soLinger to a small value will help shutdown the // connection quicker s.setSoLinger(true, 0); } } catch(Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); } } finally { if (s != null) { try { s.close(); } catch (Exception e) { // Ignore } } } } /** * Process the specified connection. */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection int step = 1; try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); // 1: Set socket options: timeout, linger, etc if (soLinger >= 0) socket.socket().setSoLinger(true,soLinger); if (tcpNoDelay) socket.socket().setTcpNoDelay(true); if (soTimeout > 0) socket.socket().setSoTimeout(soTimeout); // 2: SSL handshake step = 2; if (sslContext != 0) { // SSLSocket.attach(sslContext, socket); // if (SSLSocket.handshake(socket) != 0) { // if (log.isDebugEnabled()) { // log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); // } // return false; // } } getPoller().register(socket); } catch (Throwable t) { if (log.isDebugEnabled()) { if (step == 2) { log.debug(sm.getString("endpoint.err.handshake"), t); } else { log.debug(sm.getString("endpoint.err.unexpected"), t); } } // Tell to close the socket return false; } return true; } /** * Create (or allocate) and return an available processor for use in * processing a specific HTTP request, if possible. If the maximum * allowed processors have already been created and are in use, return * null instead. */ protected Worker createWorkerThread() { synchronized (workers) { if (workers.size() > 0) { curThreadsBusy++; return (workers.pop()); } if ((maxThreads > 0) && (curThreads < maxThreads)) { curThreadsBusy++; return (newWorkerThread()); } else { if (maxThreads < 0) { curThreadsBusy++; return (newWorkerThread()); } else { return (null); } * Create and return a new processor suitable for processing HTTP * requests and returning the corresponding responses. */ protected Worker newWorkerThread() { Worker workerThread = new Worker(); workerThread.start(); return (workerThread); } /** * Return a new worker thread, and block while to worker is available. */ protected Worker getWorkerThread() { // Allocate a new worker thread Worker workerThread = createWorkerThread(); while (workerThread == null) { try { synchronized (workers) { workers.wait(); } } catch (InterruptedException e) { // Ignore } workerThread = createWorkerThread(); } return workerThread; } /** * Recycle the specified Processor so that it can be used again. * * @param workerThread The processor to be recycled */ protected void recycleWorkerThread(Worker workerThread) { synchronized (workers) { workers.push(workerThread); curThreadsBusy--; workers.notify(); } } /** * Allocate a new poller of the specified size. */ protected long allocatePoller(int size, long pool, int timeout) { try { return Poll.create(size, pool, 0, timeout * 1000); } catch (Error e) { if (Status.APR_STATUS_IS_EINVAL(e.getError())) { log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size)); return 0; } else { log.error(sm.getString("endpoint.poll.initfail"), e); return -1; } } } /** * Process given socket. */ protected boolean processSocket(SocketChannel socket) { try { if (executor == null) { getWorkerThread().assign(socket); } else { executor.execute(new SocketProcessor(socket)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; } /** * Process given socket for an event. */ protected boolean processSocket(SocketChannel socket, boolean error) { try { if (executor == null) { getWorkerThread().assign(socket, error); } else { executor.execute(new SocketEventProcessor(socket, error)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; } // --------------------------------------------------- Acceptor Inner Class protected class Acceptor implements Runnable { /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } try { // Accept the next incoming connection from the server socket SocketChannel socket = serverSock.accept(); // Hand this socket off to an appropriate processor if(!setSocketOptions(socket)) { // Close socket right away socket.socket().close(); socket.close(); } } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } // The processor will recycle itself when it finishes } } } // ----------------------------------------------------- Poller Inner Class /** * Poller class. */ public class Poller implements Runnable { protected Selector selector; protected LinkedList events = new LinkedList(); protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling protected int keepAliveCount = 0; public int getKeepAliveCount() { return keepAliveCount; } public Poller() throws IOException { this.selector = Selector.open(); } public Selector getSelector() { return selector;} /** * Create the poller. With some versions of APR, the maximum poller size will * be 62 (reocmpiling APR is necessary to remove this limitation). */ protected void init() { keepAliveCount = 0; } /** * Destroy the poller. */ protected void destroy() { // Wait for polltime before doing anything, so that the poller threads // exit, otherwise parallel descturction of sockets which are still // in the poller can cause problems try { synchronized (this) { this.wait(pollTime / 1000); } } catch (InterruptedException e) { // Ignore } close = true; } public void addEvent(Runnable event) { synchronized (events) { events.add(event); } selector.wakeup(); } /** * Add specified socket and associated pool to the poller. The socket will * be added to a temporary array, and polled first after a maximum amount /** * of time equal to pollTime (in most cases, latency will be much lower, * however). * * @param socket to add to the poller */ public void add(final SocketChannel socket) { final SelectionKey key = socket.keyFor(selector); KeyAttachment att = (KeyAttachment)key.attachment(); if ( att != null ) att.setWakeUp(false); Runnable r = new Runnable() { public void run() { try { if (key != null) key.interestOps(SelectionKey.OP_READ); }catch ( CancelledKeyException ckx ) { try { if ( key != null && key.attachment() != null ) { KeyAttachment ka = (KeyAttachment)key.attachment(); ka.setError(true); //set to collect this socket immediately } socket.socket().close(); socket.close(); } catch ( Exception ignore ) {} } } }; addEvent(r); } public boolean events() { boolean result = false; synchronized (events) { Runnable r = null; result = (events.size() > 0); while ( (events.size() > 0) && (r = events.removeFirst()) != null ) { try { r.run(); } catch ( Exception x ) { log.error("",x); } } events.clear(); } return result; } public void register(final SocketChannel socket) { SelectionKey key = socket.keyFor(selector); Runnable r = new Runnable() { public void run() { try { socket.register(selector, SelectionKey.OP_READ, new KeyAttachment()); } catch (Exception x) { log.error("", x); } } }; synchronized (events) { events.add(r); } selector.wakeup(); } public void cancelledKey(SelectionKey key) { try { KeyAttachment ka = (KeyAttachment) key.attachment(); key.cancel(); if (ka != null && ka.getComet()) processSocket( (SocketChannel) key.channel(), true); key.channel().close(); } catch (IOException e) { if ( log.isDebugEnabled() ) log.debug("",e); // Ignore } } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; hasEvents = (hasEvents | events()); // Time to terminate? if (close) return; int keyCount = 0; try { keyCount = selector.select(selectorTimeout); } catch (Throwable x) { log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); //if (keyCount == 0) continue; Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); iterator.remove(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { if(attachment == null) attachment = new KeyAttachment(); attachment.access(); sk.attach(attachment); int readyOps = sk.readyOps(); sk.interestOps(sk.interestOps() & ~readyOps); SocketChannel channel = (SocketChannel)sk.channel(); boolean read = sk.isReadable(); if (read) { if ( attachment.getWakeUp() ) { attachment.setWakeUp(false); synchronized (attachment.getMutex()) {attachment.getMutex().notifyAll();} } else if ( attachment.getComet() ) { if (!processSocket(channel,false)) processSocket(channel,true); } else { boolean close = (!processSocket(channel)); if ( close ) { channel.socket().close(); channel.close(); } } } if (sk.isValid() && sk.isWritable()) { } } catch ( CancelledKeyException ckx ) { if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true); try { sk.channel().close(); }catch ( Exception ignore){} } catch (Throwable t) { } log.error("",t); } }//while //process timeouts timeout(keyCount,hasEvents); }//while synchronized (this) { this.notifyAll(); } } protected void timeout(int keyCount, boolean hasEvents) { long now = System.currentTimeMillis(); //don't process timeouts too frequently, but if the selector simply timed out //then we can check timeouts to avoid gaps if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return; nextExpiration = now + (long)soTimeout; //timeout Set keys = selector.keys(); for (Iterator iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); try { KeyAttachment ka = (KeyAttachment) key.attachment(); if ( ka == null ) { cancelledKey(key); //we don't support any keys without attachments } else if ( ka.getError() ) { cancelledKey(key); }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout()); boolean isTimedout = delta > timeout; if (isTimedout) { cancelledKey(key); } else { long nextTime = now+(timeout-delta); nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration; } }//end if }catch ( CancelledKeyException ckx ) { cancelledKey(key); } }//for } } public static class KeyAttachment { public long getLastAccess() { return lastAccess; } public void access() { access(System.currentTimeMillis()); } public void access(long access) { lastAccess = access; } public void setComet(boolean comet) { this.comet = comet; } public boolean getComet() { return comet; } public boolean getCurrentAccess() { return currentAccess; } public void setCurrentAccess(boolean access) { currentAccess = access; } public boolean getWakeUp() { return wakeUp; } public void setWakeUp(boolean wakeUp) { this.wakeUp = wakeUp; } public Object getMutex() {return mutex;} public void setTimeout(long timeout) {this.timeout = timeout;} public long getTimeout() {return this.timeout;} public boolean getError() { return error; } public void setError(boolean error) { this.error = error; } protected Object mutex = new Object(); protected boolean wakeUp = false; protected long lastAccess = System.currentTimeMillis(); protected boolean currentAccess = false; protected boolean comet = false; protected long timeout = -1; protected boolean error = false; } // ----------------------------------------------------- Worker Inner Class /** * Server processor class. */ protected class Worker implements Runnable { protected Thread thread = null; protected boolean available = false; protected SocketChannel socket = null; protected boolean event = false; protected boolean error = false; * Process an incoming TCP/IP connection on the specified socket. Any * exception that occurs during processing must be logged and swallowed. * NOTE: This method is called from our Connector's thread. We * must assign it to our own thread so that multiple simultaneous * requests can be handled. * * @param socket TCP socket to process */ protected synchronized void assign(SocketChannel socket) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; event = false; error = false; available = true; notifyAll(); } protected synchronized void assign(SocketChannel socket, boolean error) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; event = true; this.error = error; available = true; notifyAll(); } /** * Await a newly assigned Socket from our Connector, or null * if we are supposed to shut down. */ protected synchronized SocketChannel await() { // Wait for the Connector to provide a new Socket while (!available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket SocketChannel socket = this.socket; available = false; notifyAll(); return (socket); } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal while (running) { // Wait for the next socket to be assigned SocketChannel socket = await(); if (socket == null) continue; // Process the request from this socket if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { // Close socket and pool try { socket.socket().close(); socket.close(); }catch ( Exception x ) { log.error("",x); } } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { // Close socket and pool try { socket.socket().close(); socket.close(); }catch ( Exception x ) { log.error("",x); } } // Finish up this request recycleWorkerThread(this); } } /** * Start the background processing thread. */ public void start() { thread = new Thread(this); thread.setName(getName() + "-" + (++curThreads)); thread.setDaemon(true); thread.start(); } } // ------------------------------------------------ Handler Inner Interface /** * Bare bones interface used for socket processing. Per thread data is to be * stored in the ThreadWithAttributes extra folders, or alternately in * thread local fields. */ public interface Handler { public enum SocketState { OPEN, CLOSED, LONG } public SocketState process(SocketChannel socket); public SocketState event(SocketChannel socket, boolean error); } // ------------------------------------------------- WorkerStack Inner Class public class WorkerStack { protected Worker[] workers = null; protected int end = 0; public WorkerStack(int size) { workers = new Worker[size]; } /** * Put the object into the queue. * * @param object the object to be appended to the queue (first element). */ public void push(Worker worker) { workers[end++] = worker; } /** * Get the first object out of the queue. Return null if the queue * is empty. */ public Worker pop() { if (end > 0) { return workers[--end]; } return null; } /** * Get the first object out of the queue, Return null if the queue * is empty. */ public Worker peek() { return workers[end]; } /** * Is the queue empty? */ public boolean isEmpty() { return (end == 0); } /** * How many elements are there in this queue? */ public int size() { return (end); } } // ---------------------------------------------- SocketProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { protected SocketChannel socket = null; public SocketProcessor(SocketChannel socket) { this.socket = socket; public void run() { // Process the request from this socket if (handler.process(socket) == Handler.SocketState.CLOSED) { // Close socket and pool try { socket.socket().close(); socket.close(); } catch ( Exception x ) { log.error("",x); } socket = null; } } } // --------------------------------------- SocketEventProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketEventProcessor implements Runnable { protected SocketChannel socket = null; protected boolean error = false; public SocketEventProcessor(SocketChannel socket, boolean error) { this.socket = socket; this.error = error; } public void run() { // Process the request from this socket if (handler.event(socket, error) == Handler.SocketState.CLOSED) { // Close socket and pool try { socket.socket().close(); socket.close(); } catch ( Exception x ) { log.error("",x); } socket = null; } } } }
File
NioEndpoint.java
Developer's decision
Version 2
Kind of conflict
Class declaration
Comment
Import
Package declaration