There is one engine for the Server and the Client. the send() and receive()
Methods are the basic operations the Sync, Async messageing are build on top.
| Method from org.apache.axis2.engine.AxisEngine Detail: |
public static MessageContext createFaultMessageContext(MessageContext processingContext,
Throwable e) throws AxisFault {
return MessageContextBuilder.createFaultMessageContext(processingContext, e);
} Deprecated! ( - post 1.1 branch)
This method is called to handle any error that occurs at inflow or outflow. But if the
method is called twice, it implies that sending the error handling has failed, in which case
the method logs the error and exists. |
public static InvocationResponse receive(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " receive:" + msgContext.getMessageID());
}
ConfigurationContext confContext = msgContext.getConfigurationContext();
List< Phase > preCalculatedPhases;
if (msgContext.isFault() || msgContext.isProcessingFault()) {
preCalculatedPhases = confContext.getAxisConfiguration().getInFaultFlowPhases();
msgContext.setFLOW(MessageContext.IN_FAULT_FLOW);
} else {
preCalculatedPhases = confContext.getAxisConfiguration().getInFlowPhases();
msgContext.setFLOW(MessageContext.IN_FLOW);
}
// Set the initial execution chain in the MessageContext to a *copy* of what
// we got above. This allows individual message processing to change the chain without
// affecting later messages.
ArrayList< Handler > executionChain = new ArrayList< Handler >();
executionChain.addAll(preCalculatedPhases);
msgContext.setExecutionChain(executionChain);
try {
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.CONTINUE)) {
checkMustUnderstand(msgContext);
if (msgContext.isServerSide()) {
// invoke the Message Receivers
MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
if (receiver == null) {
throw new AxisFault(Messages.getMessage(
"nomessagereciever",
msgContext.getAxisOperation().getName().toString()));
}
receiver.receive(msgContext);
}
flowComplete(msgContext);
} else if (pi.equals(InvocationResponse.SUSPEND)) {
return pi;
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
// Undo any partial work.
// Remove the incoming message context
if (log.isDebugEnabled()) {
log.debug("InvocationResponse is aborted. " +
"The incoming MessageContext is removed, " +
"and the OperationContext is marked as incomplete");
}
AxisOperation axisOp = msgContext.getAxisOperation();
if(axisOp!=null){
String mepURI = axisOp.getMessageExchangePattern();
if (WSDL2Constants.MEP_URI_OUT_IN.equals(mepURI)) {
OperationContext opCtx = msgContext.getOperationContext();
if (opCtx != null) {
opCtx.removeMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
}
}
}
else{
log.debug("Could not clean up op ctx for " + msgContext);
}
return pi;
} else {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.receive()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
}
catch (AxisFault e) {
log.error(e.getMessage(), e);
msgContext.setFailureReason(e);
flowComplete(msgContext);
throw e;
}
return InvocationResponse.CONTINUE;
}
This methods represents the inflow of the Axis, this could be either at the server side or the client side.
Here the ExecutionChain is created using the Phases. The Handlers at the each Phases is ordered in
deployment time by the deployment module |
public static InvocationResponse resume(MessageContext msgctx) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgctx.getLogIDString() + " resume:" + msgctx.getMessageID());
}
msgctx.setPaused(false);
if (msgctx.getFLOW() == MessageContext.IN_FLOW) {
return resumeReceive(msgctx);
} else {
return resumeSend(msgctx);
}
}
Resume processing of a message. |
public static InvocationResponse resumeReceive(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " resumeReceive:" + msgContext.getMessageID());
}
//REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeReceiveFault as well, when, in fact, this does both
//REVIEW: Unlike with receive, there is no wrapping try/catch clause which would
//fire off the flowComplete on an error, as we have to assume that the
//message will be resumed again, but perhaps we need to unwind back to
//the point at which the message was resumed and provide another API
//to allow the full unwind if the message is going to be discarded.
//invoke the phases
InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
//invoking the MR
if (pi.equals(InvocationResponse.CONTINUE)) {
checkMustUnderstand(msgContext);
if (msgContext.isServerSide()) {
// invoke the Message Receivers
MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
if (receiver == null) {
throw new AxisFault(Messages.getMessage(
"nomessagereciever",
msgContext.getAxisOperation().getName().toString()));
}
receiver.receive(msgContext);
}
flowComplete(msgContext);
}
return pi;
}
If the msgConetext is puased and try to invoke then
first invoke the phase list and after the message receiver |
public static InvocationResponse resumeSend(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " resumeSend:" + msgContext.getMessageID());
}
//REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeSendFault as well, when, in fact, this does both
//REVIEW: Unlike with send, there is no wrapping try/catch clause which would
//fire off the flowComplete on an error, as we have to assume that the
//message will be resumed again, but perhaps we need to unwind back to
//the point at which the message was resumed and provide another API
//to allow the full unwind if the message is going to be discarded.
//invoke the phases
InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
//Invoking Transport Sender
if (pi.equals(InvocationResponse.CONTINUE)) {
// write the Message to the Wire
TransportOutDescription transportOut = msgContext.getTransportOut();
TransportSender sender = transportOut.getSender();
sender.invoke(msgContext);
flowComplete(msgContext);
}
return pi;
}
To resume the invocation at the send path , this is neened since it is require to call
TransportSender at the end |
public static void resumeSendFault(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " resumeSendFault:" + msgContext.getMessageID());
}
OperationContext opContext = msgContext.getOperationContext();
if (opContext != null) {
try {
InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.SUSPEND)) {
log.warn(msgContext.getLogIDString() +
" The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
return;
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
return;
} else if (!pi.equals(InvocationResponse.CONTINUE)) {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
} catch (AxisFault e) {
msgContext.setFailureReason(e);
flowComplete(msgContext);
throw e;
}
}
ArrayList< Handler > executionChain = new ArrayList< Handler >(msgContext.getConfigurationContext()
.getAxisConfiguration().getOutFaultFlowPhases());
msgContext.setExecutionChain(executionChain);
msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.CONTINUE)) {
// Actually send the SOAP Fault
TransportOutDescription transportOut = msgContext.getTransportOut();
if (transportOut == null) {
throw new AxisFault("Transport out has not been set");
}
TransportSender sender = transportOut.getSender();
sender.invoke(msgContext);
flowComplete(msgContext);
} else if (pi.equals(InvocationResponse.SUSPEND)) {
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
} else {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
}
here we assume that it is resume from an operation level handler |
public static void send(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " send:" + msgContext.getMessageID());
}
// find and invoke the Phases
OperationContext operationContext = msgContext.getOperationContext();
ArrayList executionChain = operationContext.getAxisOperation().getPhasesOutFlow();
//rather than having two steps added both oparation and global chain together
ArrayList outPhases = new ArrayList();
outPhases.addAll(executionChain);
outPhases.addAll(msgContext.getConfigurationContext().getAxisConfiguration().getOutFlowPhases());
msgContext.setExecutionChain(outPhases);
msgContext.setFLOW(MessageContext.OUT_FLOW);
try {
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.CONTINUE)) {
// write the Message to the Wire
TransportOutDescription transportOut = msgContext.getTransportOut();
if (transportOut == null) {
throw new AxisFault("Transport out has not been set");
}
TransportSender sender = transportOut.getSender();
// This boolean property only used in client side fireAndForget invocation
//It will set a property into message context and if some one has set the
//property then transport sender will invoke in a diffrent thread
Object isTransportNonBlocking = msgContext.getProperty(
MessageContext.TRANSPORT_NON_BLOCKING);
if (isTransportNonBlocking != null &&
((Boolean) isTransportNonBlocking).booleanValue()) {
msgContext.getConfigurationContext().getThreadPool().execute(
new TransportNonBlockingInvocationWorker(msgContext, sender));
} else {
sender.invoke(msgContext);
}
//REVIEW: In the case of the TransportNonBlockingInvocationWorker, does this need to wait until that finishes?
flowComplete(msgContext);
} else if (pi.equals(InvocationResponse.SUSPEND)) {
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
} else {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.send()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
} catch (AxisFault e) {
msgContext.setFailureReason(e);
flowComplete(msgContext);
throw e;
}
}
This methods represents the outflow of the Axis, this could be either at the server side or the client side.
Here the ExecutionChain is created using the Phases. The Handlers at the each Phases is ordered in
deployment time by the deployment module |
public static void sendFault(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " sendFault:" + msgContext.getMessageID());
}
OperationContext opContext = msgContext.getOperationContext();
//FIXME: If this gets paused in the operation-specific phases, the resume is not going to function correctly as the phases will not have all been set
// find and execute the Fault Out Flow Handlers
if (opContext != null) {
AxisOperation axisOperation = opContext.getAxisOperation();
ArrayList faultExecutionChain = axisOperation.getPhasesOutFaultFlow();
//adding both operation specific and global out fault flows.
ArrayList outFaultPhases = new ArrayList();
outFaultPhases.addAll((ArrayList) faultExecutionChain.clone());
msgContext.setExecutionChain((ArrayList) outFaultPhases.clone());
msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
try {
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.SUSPEND)) {
log.warn(msgContext.getLogIDString() +
" The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
return;
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
return;
} else if (!pi.equals(InvocationResponse.CONTINUE)) {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
}
catch (AxisFault e) {
msgContext.setFailureReason(e);
flowComplete(msgContext);
throw e;
}
}
ArrayList< Handler > executionChain = new ArrayList< Handler >(msgContext.getConfigurationContext()
.getAxisConfiguration().getOutFaultFlowPhases());
msgContext.setExecutionChain(executionChain);
msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.CONTINUE)) {
// Actually send the SOAP Fault
TransportOutDescription transportOut = msgContext.getTransportOut();
if (transportOut == null) {
throw new AxisFault("Transport out has not been set");
}
TransportSender sender = transportOut.getSender();
sender.invoke(msgContext);
flowComplete(msgContext);
} else if (pi.equals(InvocationResponse.SUSPEND)) {
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
} else {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
}
Sends the SOAP Fault to another SOAP node. |