private static final long serialVersionUID = -5533042142349963796L;
<<<<<<< HEAD
private final transient SerializedObject serializedObject;
private final transient Serializer payloadSerializer;
private transient volatile Class> objectType;
private volatile T deserialized;
=======
private transient Serializer serializer;
private transient LazyUpcastingObject serializedObject;
private transient int indexOnDeserializedObject;
private transient volatile Class deserializedObjectType;
private volatile T deserializedObject;
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
/**
* Creates an instance with the given deserializedObject object instance. Using this constructor will
Solution content
private static final long serialVersionUID = -5533042142349963796L;
private transient Serializer serializer;
private transient LazyUpcastingObject serializedObject;
private transient int indexOnDeserializedObject;
private transient volatile Class deserializedObjectType;
private volatile T deserializedObject;
/**
* Creates an instance with the given deserializedObject object instance. Using this constructor will
File
LazyDeserializingObject.java
Developer's decision
Version 2
Kind of conflict
Attribute
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore;
import org.axonframework.domain.DomainEventMessage;
import java.util.Map;
import org.axonframework.serializer.Serializer;
import org.joda.time.DateTime;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
/**
* DomainEventMessage implementation that is optimized to cope with serialized Payload and MetaData. The Payload and
* MetaData will only be deserialized when requested. This means that loaded event for which there is no handler will
* never be deserialized.
*
* This implementation is Serializable as per Java specification. Both MetaData and Payload are deserialized prior to
* being written to the OutputStream.
*
* @param The type of payload contained in this message
* @author Allard Buijze
* @since 2.0
*/
public class SerializedDomainEventMessage implements DomainEventMessage {
private static final long serialVersionUID = 1946981128830316529L;
private final long sequenceNumber;
private final Object aggregateIdentifier;
private final String eventIdentifier;
private final DateTime timestamp;
private final transient LazyDeserializingObject serializedMetaData; // NOSONAR
private final transient LazyDeserializingObject serializedPayload; // NOSONAR
/**
* Creates a new instance with given serialized data, with data to be deserialized with given
* payloadSerializer and metaDataSerializer.
*
* @param data The serialized data for this EventMessage
* @param payloadSerializer The serializer to deserialize the payload data with
* @param metaDataSerializer The serializer to deserialize meta data with
*/
public SerializedDomainEventMessage(SerializedDomainEventData data, Serializer payloadSerializer,
Serializer metaDataSerializer) {
this(data.getEventIdentifier(),
data.getAggregateIdentifier(),
data.getSequenceNumber(),
data.getTimestamp(),
new LazyDeserializingObject(data.getPayload(), payloadSerializer),
new LazyDeserializingObject(data.getMetaData(), metaDataSerializer));
}
/**
* Creates a new instance with given event details,to be deserialized with given
* payloadSerializer and metaDataSerializer.
*
* @param eventIdentifier The identifier of the EventMessage
* @param aggregateIdentifier The identifier of the Aggregate this message originates from
* @param sequenceNumber The sequence number that represents the order in which the message is generated
* @param timestamp The timestamp of (original) message creation
* @param serializedPayload The serialized payload of this message
* @param serializedMetaData The serialized meta data of the message
*/
public SerializedDomainEventMessage(String eventIdentifier, Object aggregateIdentifier,
long sequenceNumber, DateTime timestamp,
LazyDeserializingObject serializedPayload,
LazyDeserializingObject serializedMetaData) {
this.sequenceNumber = sequenceNumber;
this.aggregateIdentifier = aggregateIdentifier;
this.eventIdentifier = eventIdentifier;
this.timestamp = timestamp;
this.serializedPayload = serializedPayload;
this.serializedMetaData = serializedMetaData;
}
private SerializedDomainEventMessage(SerializedDomainEventMessage original, Map metaData) {
this.serializedMetaData = new LazyDeserializingObject(MetaData.from(metaData));
this.aggregateIdentifier = original.getAggregateIdentifier();
this.sequenceNumber = original.getSequenceNumber();
this.eventIdentifier = original.getIdentifier();
this.timestamp = original.getTimestamp();
this.serializedPayload = original.serializedPayload;
}
@Override
public long getSequenceNumber() {
return sequenceNumber;
}
@Override
public Object getAggregateIdentifier() {
return aggregateIdentifier;
}
@Override
public String getIdentifier() {
return eventIdentifier;
}
@Override
public DateTime getTimestamp() {
return timestamp;
}
@Override
public MetaData getMetaData() {
MetaData metaData = serializedMetaData.getObject();
return metaData == null ? MetaData.emptyInstance() : metaData;
}
@SuppressWarnings({"unchecked"})
@Override
public T getPayload() {
return serializedPayload.getObject();
}
@Override
public Class getPayloadType() {
return serializedPayload.getType();
}
@Override
public DomainEventMessage withMetaData(Map newMetaData) {
if (serializedPayload.isDeserialized()) {
return new GenericDomainEventMessage(aggregateIdentifier, sequenceNumber,
serializedPayload.getObject(), newMetaData);
} else {
return new SerializedDomainEventMessage(this, newMetaData);
}
}
/**
* {@inheritDoc}
*
* This method will force the MetaData to be deserialized if not already done.
*/
@Override
public DomainEventMessage andMetaData(Map additionalMetaData) {
MetaData newMetaData = getMetaData().mergedWith(additionalMetaData);
return withMetaData(newMetaData);
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.upcasting.UpcasterChain;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* DomainEventMessage implementation that is optimized to cope with serialized Payload and MetaData. The Payload and
* MetaData will only be deserialized when requested. This means that loaded event for which there is no handler will
* never be deserialized.
*
* This implementation is Serializable as per Java specification. Both MetaData and Payload are deserialized prior to
* being written to the OutputStream.
*
* @param The type of payload contained in this message
* @author Allard Buijze
* @author Frank Versnel
* @since 2.0
*/
public class SerializedDomainEventMessage implements DomainEventMessage {
private static final long serialVersionUID = 1946981128830316529L;
private final long sequenceNumber;
private final Object aggregateIdentifier;
private final String eventIdentifier;
private final DateTime timestamp;
private transient final LazyDeserializingObject serializedMetaData;
private transient final LazyDeserializingObject serializedPayload;
/**
* Creates a new instance with given event details,to be deserialized with given
* payloadSerializer and metaDataSerializer.
*
* @param eventIdentifier The identifier of the EventMessage
* @param aggregateIdentifier The identifier of the Aggregate this message originates from
* @param sequenceNumber The sequence number that represents the order in which the message is generated
* @param timestamp The timestamp of (original) message creation
* @param serializedPayload The serialized payload of this message
* @param serializedMetaData The serialized meta data of the message
*/
public SerializedDomainEventMessage(String eventIdentifier, Object aggregateIdentifier,
long sequenceNumber, DateTime timestamp,
LazyDeserializingObject serializedPayload,
LazyDeserializingObject serializedMetaData) {
this.sequenceNumber = sequenceNumber;
this.aggregateIdentifier = aggregateIdentifier;
this.eventIdentifier = eventIdentifier;
this.timestamp = timestamp;
this.serializedPayload = serializedPayload;
this.serializedMetaData = serializedMetaData;
}
private SerializedDomainEventMessage(SerializedDomainEventMessage original, Map metaData) {
this.serializedMetaData = new LazyDeserializingObject(MetaData.from(metaData));
this.aggregateIdentifier = original.getAggregateIdentifier();
this.sequenceNumber = original.getSequenceNumber();
this.eventIdentifier = original.getIdentifier();
this.timestamp = original.getTimestamp();
this.serializedPayload = original.serializedPayload;
}
/**
* Creates new instances with given serialized data, with data to be deserialized with given
* payloadSerializer and metaDataSerializer.
*
* @param data The serialized data for this EventMessage
* @param payloadSerializer The serializer to deserialize the payload data with
* @param metaDataSerializer The serializer to deserialize meta data with
* @param upcasterChain Used for upcasting the given data
* @return the newly created instances, where each instance represents a domain event in the serialized data
*/
public static List createDomainEventMessages(SerializedDomainEventData data,
Serializer payloadSerializer,
Serializer metaDataSerializer,
UpcasterChain upcasterChain) {
return createDomainEventMessages(payloadSerializer, metaDataSerializer, data.getEventIdentifier(),
data.getAggregateIdentifier(), data.getSequenceNumber(),
data.getTimestamp(), data.getPayload(), data.getMetaData(), upcasterChain);
}
/**
* Creates new instances with given event details,to be deserialized with given eventSerializer.
*
* @param eventSerializer The serializer to deserialize both the serializedObject and the
* serializedMetaData
* @param eventIdentifier The identifier of the EventMessage
* @param aggregateIdentifier The identifier of the Aggregate this message originates from
* @param sequenceNumber The sequence number that represents the order in which the message is generated
* @param timestamp The timestamp of (original) message creation
* @param serializedPayload The serialized payload of this message
* @param serializedMetaData The serialized meta data of the message
* @param upcasterChain Used for upcasting the given serializedPayload and serializedMetaData
* @return the newly created instances, where each instance represents a domain event in the serialized data
*/
public static List createDomainEventMessages(Serializer eventSerializer,
String eventIdentifier,
Object aggregateIdentifier,
long sequenceNumber,
}
DateTime timestamp,
SerializedObject serializedPayload,
SerializedObject serializedMetaData,
UpcasterChain upcasterChain) {
return createDomainEventMessages(eventSerializer,
eventSerializer,
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
timestamp,
serializedPayload,
serializedMetaData,
upcasterChain);
}
private static List createDomainEventMessages(Serializer payloadSerializer,
Serializer metaDataSerializer,
String eventIdentifier,
Object aggregateIdentifier,
long sequenceNumber,
DateTime timeStamp,
SerializedObject serializedPayload,
SerializedObject serializedMetaData,
UpcasterChain upcasterChain) {
LazyUpcastingObject lazyUpcastedPayload = new LazyUpcastingObject(upcasterChain, serializedPayload);
LazyUpcastingObject lazyUpcastedMetaData = new LazyUpcastingObject(upcasterChain, serializedMetaData);
List lazyDeserializedDomainEvents = new ArrayList();
for (int eventIndex = 0; eventIndex < lazyUpcastedPayload.upcastedObjectCount(); eventIndex++) {
lazyDeserializedDomainEvents.add(
new SerializedDomainEventMessage(
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
timeStamp,
new LazyDeserializingObject
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.upcasting.UpcasterChain;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
@Override
*
public MetaData getMetaData() {
* DomainEventMessage implementation that is optimized to cope with serialized Payload and MetaData. The Payload and
* MetaData will only be deserialized when requested. This means that loaded event for which there is no handler will
* never be deserialized.
*
* This implementation is Serializable as per Java specification. Both MetaData and Payload are deserialized prior to
* being written to the OutputStream.
*
* @param The type of payload contained in this message
* @author Allard Buijze
* @author Frank Versnel
* @since 2.0
*/
public class SerializedDomainEventMessage implements DomainEventMessage {
private static final long serialVersionUID = 1946981128830316529L;
private final long sequenceNumber;
private final Object aggregateIdentifier;
private final String eventIdentifier;
private final DateTime timestamp;
private final transient LazyDeserializingObject serializedMetaData; // NOSONAR
private final transient LazyDeserializingObject serializedPayload; // NOSONAR
/**
* Creates a new instance with given event details,to be deserialized with given
* payloadSerializer and metaDataSerializer.
* @param eventIdentifier The identifier of the EventMessage
* @param aggregateIdentifier The identifier of the Aggregate this message originates from
* @param sequenceNumber The sequence number that represents the order in which the message is generated
* @param timestamp The timestamp of (original) message creation
* @param serializedPayload The serialized payload of this message
* @param serializedMetaData The serialized meta data of the message
*/
public SerializedDomainEventMessage(String eventIdentifier, Object aggregateIdentifier,
long sequenceNumber, DateTime timestamp,
LazyDeserializingObject serializedPayload,
LazyDeserializingObject serializedMetaData) {
this.sequenceNumber = sequenceNumber;
this.aggregateIdentifier = aggregateIdentifier;
this.eventIdentifier = eventIdentifier;
this.timestamp = timestamp;
this.serializedPayload = serializedPayload;
this.serializedMetaData = serializedMetaData;
}
private SerializedDomainEventMessage(SerializedDomainEventMessage original, Map metaData) {
this.serializedMetaData = new LazyDeserializingObject(MetaData.from(metaData));
this.aggregateIdentifier = original.getAggregateIdentifier();
this.sequenceNumber = original.getSequenceNumber();
this.eventIdentifier = original.getIdentifier();
this.timestamp = original.getTimestamp();
this.serializedPayload = original.serializedPayload;
}
/**
* Creates new instances with given serialized data, with data to be deserialized with given
* payloadSerializer and metaDataSerializer.
*
* @param data The serialized data for this EventMessage
* @param payloadSerializer The serializer to deserialize the payload data with
* @param metaDataSerializer The serializer to deserialize meta data with
* @param upcasterChain Used for upcasting the given data
* @return the newly created instances, where each instance represents a domain event in the serialized data
*/
public static List createDomainEventMessages(SerializedDomainEventData data,
Serializer payloadSerializer,
Serializer metaDataSerializer,
UpcasterChain upcasterChain) {
return createDomainEventMessages(payloadSerializer, metaDataSerializer, data.getEventIdentifier(),
data.getAggregateIdentifier(), data.getSequenceNumber(),
data.getTimestamp(), data.getPayload(), data.getMetaData(), upcasterChain);
}
/**
* Creates new instances with given event details,to be deserialized with given eventSerializer.
*
* @param eventSerializer The serializer to deserialize both the serializedObject and the
* serializedMetaData
* @param eventIdentifier The identifier of the EventMessage
* @param aggregateIdentifier The identifier of the Aggregate this message originates from
* @param sequenceNumber The sequence number that represents the order in which the message is generated
* @param timestamp The timestamp of (original) message creation
* @param serializedPayload The serialized payload of this message
* @param serializedMetaData The serialized meta data of the message
* @param upcasterChain Used for upcasting the given serializedPayload and serializedMetaData
* @return the newly created instances, where each instance represents a domain event in the serialized data
*/
public static List createDomainEventMessages(Serializer eventSerializer,
String eventIdentifier,
Object aggregateIdentifier,
long sequenceNumber,
DateTime timestamp,
SerializedObject serializedPayload,
SerializedObject serializedMetaData,
UpcasterChain upcasterChain) {
return createDomainEventMessages(eventSerializer,
eventSerializer,
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
timestamp,
serializedPayload,
serializedMetaData,
upcasterChain);
}
private static List createDomainEventMessages(Serializer payloadSerializer,
Serializer metaDataSerializer,
String eventIdentifier,
Object aggregateIdentifier,
long sequenceNumber,
DateTime timeStamp,
SerializedObject serializedPayload,
SerializedObject serializedMetaData,
UpcasterChain upcasterChain) {
LazyUpcastingObject lazyUpcastedPayload = new LazyUpcastingObject(upcasterChain, serializedPayload);
LazyUpcastingObject lazyUpcastedMetaData = new LazyUpcastingObject(upcasterChain, serializedMetaData);
List lazyDeserializedDomainEvents = new ArrayList();
for (int eventIndex = 0; eventIndex < lazyUpcastedPayload.upcastedObjectCount(); eventIndex++) {
lazyDeserializedDomainEvents.add(
new SerializedDomainEventMessage(
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
timeStamp,
new LazyDeserializingObject
File
SerializedDomainEventMessage.java
Developer's decision
Combination
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.eventstore.LazyDeserializingObject;
import org.axonframework.eventstore.SerializedDomainEventData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.serializer.SerializedMetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.joda.time.DateTime;
import java.util.Arrays;
import javax.persistence.Basic;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.MappedSuperclass;
import javax.persistence.UniqueConstraint;
/**
* Data needed by different types of event logs.
*
* @author Allard Buijze
* @since 0.5
*/
@MappedSuperclass
@UniqueConstraint(columnNames = {"eventIdentifier"})
abstract class AbstractEventEntry implements SerializedDomainEventData {
@Id
@GeneratedValue
private Long id;
@Basic
private String eventIdentifier;
@Basic
private String aggregateIdentifier;
@Basic
private long sequenceNumber;
@Basic
private String timeStamp;
@Basic
private String type;
@Basic
private String payloadType;
@Basic
private String payloadRevision;
@Basic
@Lob
private byte[] metaData;
@Basic
@Lob
private byte[] payload;
/**
* Initialize an Event entry for the given event.
*
* @param type The type identifier of the aggregate root the event belongs to
* @param event The event to store in the eventstore
* @param payload The serialized payload of the Event
* @param metaData The serialized metaData of the Event
*/
protected AbstractEventEntry(String type, DomainEventMessage event,
SerializedObject payload, SerializedObject metaData) {
this.eventIdentifier = event.getIdentifier();
this.type = type;
this.payloadType = payload.getType().getName();
this.payloadRevision = payload.getType().getRevision();
this.payload = payload.getData();
this.aggregateIdentifier = event.getAggregateIdentifier().toString();
this.sequenceNumber = event.getSequenceNumber();
this.metaData = Arrays.copyOf(metaData.getData(), metaData.getData().length);
this.timeStamp = event.getTimestamp().toString();
}
/**
* Default constructor, as required by JPA specification. Do not use directly!
*/
protected AbstractEventEntry() {
}
/**
* Reconstructs the DomainEvent using the given eventSerializer.
*
* @param eventSerializer The Serializer to deserialize the DomainEvent with.
* @return The deserialized domain event
*/
public DomainEventMessage> getDomainEvent(Serializer eventSerializer) {
return new SerializedDomainEventMessage
Solution content
@Basic
@Lob
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.eventstore.SerializedDomainEventData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.serializer.SerializedMetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.upcasting.UpcasterChain;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import javax.persistence.Basic;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.MappedSuperclass;
import javax.persistence.UniqueConstraint;
/**
* Data needed by different types of event logs.
*
* @author Allard Buijze
* @since 0.5
*/
@MappedSuperclass
@UniqueConstraint(columnNames = {"eventIdentifier"})
abstract class AbstractEventEntry implements SerializedDomainEventData {
@Id
@GeneratedValue
private Long id;
@Basic
private String eventIdentifier;
@Basic
private String aggregateIdentifier;
@Basic
private long sequenceNumber;
@Basic
private String timeStamp;
@Basic
private String type;
@Basic
private String payloadType;
@Basic
private String payloadRevision;
@Basic
@Lob
private byte[] metaData;
private byte[] payload;
/**
* Initialize an Event entry for the given event.
*
* @param type The type identifier of the aggregate root the event belongs to
* @param event The event to store in the eventstore
* @param payload The serialized payload of the Event
* @param metaData The serialized metaData of the Event
*/
protected AbstractEventEntry(String type, DomainEventMessage event,
SerializedObject payload, SerializedObject metaData) {
this.eventIdentifier = event.getIdentifier();
this.type = type;
this.payloadType = payload.getType().getName();
this.payloadRevision = payload.getType().getRevision();
this.payload = payload.getData();
this.aggregateIdentifier = event.getAggregateIdentifier().toString();
this.sequenceNumber = event.getSequenceNumber();
this.metaData = Arrays.copyOf(metaData.getData(), metaData.getData().length);
this.timeStamp = event.getTimestamp().toString();
}
/**
* Default constructor, as required by JPA specification. Do not use directly!
*/
protected AbstractEventEntry() {
}
/**
* Reconstructs the DomainEvent using the given eventSerializer.
*
* @param eventSerializer The Serializer to deserialize the DomainEvent with.
* @param upcasterChain Set of upcasters to use when an event needs upcasting before de-serialization
* @return The deserialized domain event
*/
public List getDomainEvents(Serializer eventSerializer, UpcasterChain upcasterChain) {
return SerializedDomainEventMessage.createDomainEventMessages(eventSerializer,
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
new DateTime(timeStamp),
new SimpleSerializedObject(payload,
byte[].class,
payloadType,
payloadRevision),
new SerializedMetaData(metaData, byte[].class),
upcasterChain);
}
/**
* Returns the Aggregate Identifier of the associated event.
*
* @return the Aggregate Identifier of the associated event.
*/
public Object getAggregateIdentifier() {
return aggregateIdentifier;
}
/**
* Returns the type identifier of the aggregate.
*
* @return the type identifier of the aggregate.
*/
public String getType() {
return type;
}
/**
* Returns the sequence number of the associated event.
*
* @return the sequence number of the associated event.
*/
@Override
public long getSequenceNumber() {
return sequenceNumber;
}
/**
* Returns the time stamp of the associated event.
*
* @return the time stamp of the associated event.
*/
@Override
public DateTime getTimestamp() {
return new DateTime(timeStamp);
}
@Override
public String getEventIdentifier() {
return eventIdentifier;
}
/**
* Returns the database-generated identifier for this entry.
*
* @return the database-generated identifier for this entry
*/
public Long getId() {
return id;
}
@Override
public SerializedObject getPayload() {
return new SimpleSerializedObject(payload, byte[].class, payloadType, payloadRevision);
}
@Override
public SerializedObject getMetaData() {
return new SerializedMetaData(metaData, byte[].class);
}
}
File
AbstractEventEntry.java
Developer's decision
Combination
Kind of conflict
Annotation
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.LazyDeserializingObject;
import org.axonframework.eventstore.SerializedDomainEventData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.eventstore.SnapshotEventStore;
import org.axonframework.eventstore.jpa.criteria.JpaCriteria;
import org.axonframework.eventstore.jpa.criteria.JpaCriteriaBuilder;
import org.axonframework.eventstore.jpa.criteria.ParameterRegistry;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import static org.axonframework.common.IdentifierValidator.validateIdentifier;
/**
* An EventStore implementation that uses JPA to store DomainEvents in a database. The actual DomainEvent is stored as
* a
* serialized blob of bytes. Other columns are used to store meta-data that allow quick finding of DomainEvents for a
* specific aggregate in the correct order.
*
* This EventStore supports snapshots pruning, which can enabled by configuring a {@link #setMaxSnapshotsArchived(int)
* maximum number of snapshots to archive}. By default snapshot pruning is configured to archive only {@value
* #DEFAULT_MAX_SNAPSHOTS_ARCHIVED} snapshot per aggregate.
*
* The serializer used to serialize the events is configurable. By default, the {@link XStreamSerializer} is used.
*
* @author Allard Buijze
* @since 0.5
*/
public class JpaEventStore implements SnapshotEventStore, EventStoreManagement {
private static final Logger logger = LoggerFactory.getLogger(JpaEventStore.class);
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_SNAPSHOTS_ARCHIVED = 1;
private final EntityManagerProvider entityManagerProvider;
private final Serializer eventSerializer;
private final EventEntryStore eventEntryStore;
private int batchSize = DEFAULT_BATCH_SIZE;
private int maxSnapshotsArchived = DEFAULT_MAX_SNAPSHOTS_ARCHIVED;
private PersistenceExceptionResolver persistenceExceptionResolver;
/**
* Initialize a JpaEventStore using an {@link org.axonframework.serializer.xml.XStreamSerializer}, which
* serializes events as XML and the default Event Entry store.
*
* The JPA Persistence context is required to contain two entities: {@link DomainEventEntry} and {@link
* SnapshotEventEntry}.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider) {
this(entityManagerProvider, new XStreamSerializer(), new DefaultEventEntryStore());
}
/**
* Initialize a JpaEventStore using the given eventEntryStore and an {@link
* org.axonframework.serializer.xml.XStreamSerializer}, which serializes events as XML.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventEntryStore The instance providing persistence logic for Domain Event entries
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider, EventEntryStore eventEntryStore) {
this(entityManagerProvider, new XStreamSerializer(), eventEntryStore);
}
/**
* Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the
* events in the database using the default EventEntryStore.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventSerializer The serializer to (de)serialize domain events with.
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider,
Serializer eventSerializer) {
this(entityManagerProvider, eventSerializer, new DefaultEventEntryStore());
}
/**
* Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the
* events in the database using the given eventEntryStore.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventSerializer The serializer to (de)serialize domain events with.
* @param eventEntryStore The instance providing persistence logic for Domain Event entries
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider,
Serializer eventSerializer, EventEntryStore eventEntryStore) {
this.entityManagerProvider = entityManagerProvider;
this.eventSerializer = eventSerializer;
this.eventEntryStore = eventEntryStore;
}
/**
* {@inheritDoc}
*/
@Override
public void appendEvents(String type, DomainEventStream events) {
DomainEventMessage event = null;
try {
EntityManager entityManager = entityManagerProvider.getEntityManager();
while (events.hasNext()) {
event = events.next();
validateIdentifier(event.getAggregateIdentifier().getClass());
eventEntryStore.persistEvent(type, event, eventSerializer.serialize(event.getPayload(), byte[].class),
eventSerializer.serialize(event.getMetaData(), byte[].class),
entityManager);
}
entityManager.flush();
} catch (RuntimeException exception) {
if (persistenceExceptionResolver != null
&& persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
throw new ConcurrencyException(
String.format("Concurrent modification detected for Aggregate identifier [%s], sequence: [%s]",
event.getAggregateIdentifier(),
event.getSequenceNumber()),
exception);
}
throw exception;
}
}
/**
* {@inheritDoc}
*/
@SuppressWarnings({"unchecked"})
@Override
public DomainEventStream readEvents(String type, Object identifier) {
long snapshotSequenceNumber = -1;
EntityManager entityManager = entityManagerProvider.getEntityManager();
SerializedDomainEventData lastSnapshotEvent = eventEntryStore.loadLastSnapshotEvent(type, identifier,
entityManager);
DomainEventMessage snapshotEvent = null;
if (lastSnapshotEvent != null) {
try {
snapshotEvent = new GenericDomainEventMessage(
identifier,
lastSnapshotEvent.getSequenceNumber(),
eventSerializer.deserialize(lastSnapshotEvent.getPayload()),
(Map) eventSerializer.deserialize(lastSnapshotEvent.getMetaData()));
snapshotSequenceNumber = snapshotEvent.getSequenceNumber();
} catch (RuntimeException ex) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
ex.getClass().getName(),
ex.getMessage());
} catch (LinkageError error) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
error.getClass().getName(),
error.getMessage());
}
}
List events = fetchBatch(type, identifier, snapshotSequenceNumber + 1);
if (snapshotEvent != null) {
events.add(0, snapshotEvent);
}
if (events.isEmpty()) {
throw new EventStreamNotFoundException(type, identifier);
}
return new BatchingDomainEventStream(events, identifier, type);
}
@SuppressWarnings({"unchecked"})
private List fetchBatch(String type, Object identifier, long firstSequenceNumber) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
List extends SerializedDomainEventData> entries = eventEntryStore.fetchBatch(type,
identifier,
firstSequenceNumber,
batchSize,
entityManager);
List events = new ArrayList(entries.size());
for (SerializedDomainEventData entry : entries) {
events.add(new SerializedDomainEventMessage(entry.getEventIdentifier(),
identifier,
entry.getSequenceNumber(),
entry.getTimestamp(),
new LazyDeserializingObject(entry.getPayload(),
eventSerializer),
new LazyDeserializingObject(entry.getMetaData(),
eventSerializer)));
}
return events;
}
/**
* {@inheritDoc}
*
* Upon appending a snapshot, this particular EventStore implementation also prunes snapshots which are considered
* redundant because they fall outside of the range of maximum snapshots to archive.
*/
@Override
public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
// Persist snapshot before pruning redundant archived ones, in order to prevent snapshot misses when reloading
// an aggregate, which may occur when a READ_UNCOMMITTED transaction isolation level is used.
eventEntryStore.persistSnapshot(type, snapshotEvent,
eventSerializer.serialize(snapshotEvent.getPayload(), byte[].class),
eventSerializer.serialize(snapshotEvent.getMetaData(), byte[].class),
entityManager);
if (maxSnapshotsArchived > 0) {
eventEntryStore.pruneSnapshots(type, snapshotEvent, maxSnapshotsArchived,
entityManagerProvider.getEntityManager());
}
}
@Override
public void visitEvents(EventVisitor visitor) {
doVisitEvents(visitor, null, Collections.emptyMap());
}
@Override
public void visitEvents(Criteria criteria, EventVisitor visitor) {
StringBuilder sb = new StringBuilder();
ParameterRegistry parameters = new ParameterRegistry();
((JpaCriteria) criteria).parse("e", sb, parameters);
doVisitEvents(visitor, sb.toString(), parameters.getParameters());
}
@Override
public CriteriaBuilder newCriteriaBuilder() {
return new JpaCriteriaBuilder();
}
private void doVisitEvents(EventVisitor visitor, String whereClause, Map parameters) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
int first = 0;
List extends SerializedDomainEventData> batch;
boolean shouldContinue = true;
while (shouldContinue) {
batch = eventEntryStore.fetchFilteredBatch(whereClause, parameters,
first, batchSize, entityManager);
for (SerializedDomainEventData entry : batch) {
visitor.doWithEvent(new SerializedDomainEventMessage(entry, eventSerializer, eventSerializer));
}
shouldContinue = (batch.size() >= batchSize);
first += batchSize;
}
}
/**
* Registers the data source that allows the EventStore to detect the database type and define the error codes that
* represent concurrent access failures.
*
* Should not be used in combination with {@link #setPersistenceExceptionResolver(PersistenceExceptionResolver)},
* but rather as a shorthand alternative for most common database types.
*
* @param dataSource A data source providing access to the backing database
* @throws SQLException If an error occurs while accessing the dataSource
*/
public void setDataSource(DataSource dataSource) throws SQLException {
if (persistenceExceptionResolver == null) {
persistenceExceptionResolver = new SQLErrorCodesResolver(dataSource);
}
}
/**
* Sets the persistenceExceptionResolver that will help detect concurrency exceptions from the backing database.
*
* @param persistenceExceptionResolver the persistenceExceptionResolver that will help detect concurrency
* exceptions
*/
public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
this.persistenceExceptionResolver = persistenceExceptionResolver;
}
/**
* Sets the number of events that should be read at each database access. When more than this number of events must
* be read to rebuild an aggregate's state, the events are read in batches of this size. Defaults to 100.
*
* Tip: if you use a snapshotter, make sure to choose snapshot trigger and batch size such that a single batch will
* generally retrieve all events required to rebuild an aggregate's state.
*
* @param batchSize the number of events to read on each database access. Default to 100.
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* Sets the maximum number of snapshots to archive for an aggregate. The EventStore will keep at most this number
* of
* snapshots per aggregate.
*
* Defaults to {@value #DEFAULT_MAX_SNAPSHOTS_ARCHIVED}.
*
* @param maxSnapshotsArchived The maximum number of snapshots to archive for an aggregate. A value less than 1
* disables pruning of snapshots.
*/
public void setMaxSnapshotsArchived(int maxSnapshotsArchived) {
this.maxSnapshotsArchived = maxSnapshotsArchived;
}
private final class BatchingDomainEventStream implements DomainEventStream {
private int currentBatchSize;
private Iterator currentBatch;
private DomainEventMessage next;
private final Object id;
private final String typeId;
private BatchingDomainEventStream(List firstBatch, Object id, String typeId) {
this.id = id;
this.typeId = typeId;
this.currentBatchSize = firstBatch.size();
this.currentBatch = firstBatch.iterator();
if (currentBatch.hasNext()) {
next = currentBatch.next();
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public DomainEventMessage next() {
DomainEventMessage current = next;
if (next != null && !currentBatch.hasNext() && currentBatchSize >= batchSize) {
logger.debug("Fetching new batch for Aggregate [{}]", id);
List newBatch = fetchBatch(typeId, id, next.getSequenceNumber() + 1);
currentBatchSize = newBatch.size();
currentBatch = newBatch.iterator();
}
next = currentBatch.hasNext() ? currentBatch.next() : null;
return current;
}
@Override
public DomainEventMessage peek() {
return next;
}
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.SerializedDomainEventData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.eventstore.SnapshotEventStore;
import org.axonframework.eventstore.jpa.criteria.JpaCriteria;
import org.axonframework.eventstore.jpa.criteria.JpaCriteriaBuilder;
import org.axonframework.eventstore.jpa.criteria.ParameterRegistry;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.XStreamSerializer;
import org.axonframework.upcasting.Upcaster;
import org.axonframework.upcasting.UpcasterChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import static org.axonframework.common.IdentifierValidator.validateIdentifier;
/**
* An EventStore implementation that uses JPA to store DomainEvents in a database. The actual DomainEvent is stored as
* a
* serialized blob of bytes. Other columns are used to store meta-data that allow quick finding of DomainEvents for a
* specific aggregate in the correct order.
*
* This EventStore supports snapshots pruning, which can enabled by configuring a {@link #setMaxSnapshotsArchived(int)
* maximum number of snapshots to archive}. By default snapshot pruning is configured to archive only {@value
* #DEFAULT_MAX_SNAPSHOTS_ARCHIVED} snapshot per aggregate.
*
* The serializer used to serialize the events is configurable. By default, the {@link XStreamSerializer} is used.
*
* @author Allard Buijze
* @since 0.5
*/
public class JpaEventStore implements SnapshotEventStore, EventStoreManagement {
private static final Logger logger = LoggerFactory.getLogger(JpaEventStore.class);
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_SNAPSHOTS_ARCHIVED = 1;
private final Serializer eventSerializer;
private final EventEntryStore eventEntryStore;
private int batchSize = DEFAULT_BATCH_SIZE;
private UpcasterChain upcasterChain = UpcasterChain.EMPTY;
private final EntityManagerProvider entityManagerProvider;
private int maxSnapshotsArchived = DEFAULT_MAX_SNAPSHOTS_ARCHIVED;
private PersistenceExceptionResolver persistenceExceptionResolver;
/**
* Initialize a JpaEventStore using an {@link org.axonframework.serializer.XStreamSerializer}, which
* serializes events as XML and the default Event Entry store.
*
* The JPA Persistence context is required to contain two entities: {@link DomainEventEntry} and {@link
* SnapshotEventEntry}.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider) {
this(entityManagerProvider, new XStreamSerializer(), new DefaultEventEntryStore());
}
/**
* Initialize a JpaEventStore using the given eventEntryStore and an {@link
* org.axonframework.serializer.XStreamSerializer}, which serializes events as XML.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventEntryStore The instance providing persistence logic for Domain Event entries
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider, EventEntryStore eventEntryStore) {
this(entityManagerProvider, new XStreamSerializer(), eventEntryStore);
}
/**
* Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the
* events in the database using the default EventEntryStore.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventSerializer The serializer to (de)serialize domain events with.
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider,
Serializer eventSerializer) {
this(entityManagerProvider, eventSerializer, new DefaultEventEntryStore());
}
/**
* Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the
* events in the database using the given eventEntryStore.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventSerializer The serializer to (de)serialize domain events with.
* @param eventEntryStore The instance providing persistence logic for Domain Event entries
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider,
Serializer eventSerializer, EventEntryStore eventEntryStore) {
this.entityManagerProvider = entityManagerProvider;
this.eventSerializer = eventSerializer;
this.eventEntryStore = eventEntryStore;
}
/**
* {@inheritDoc}
*/
@Override
public void appendEvents(String type, DomainEventStream events) {
DomainEventMessage event = null;
try {
EntityManager entityManager = entityManagerProvider.getEntityManager();
while (events.hasNext()) {
event = events.next();
validateIdentifier(event.getAggregateIdentifier().getClass());
eventEntryStore.persistEvent(type, event, eventSerializer.serialize(event.getPayload(), byte[].class),
eventSerializer.serialize(event.getMetaData(), byte[].class),
entityManager);
}
entityManager.flush();
} catch (RuntimeException exception) {
if (persistenceExceptionResolver != null
&& persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
throw new ConcurrencyException(
String.format("Concurrent modification detected for Aggregate identifier [%s], sequence: [%s]",
event.getAggregateIdentifier(),
event.getSequenceNumber()),
exception);
}
throw exception;
}
}
/**
* {@inheritDoc}
*/
@SuppressWarnings({"unchecked"})
@Override
public DomainEventStream readEvents(String type, Object identifier) {
long snapshotSequenceNumber = -1;
EntityManager entityManager = entityManagerProvider.getEntityManager();
SerializedDomainEventData lastSnapshotEvent = eventEntryStore.loadLastSnapshotEvent(type, identifier,
entityManager);
DomainEventMessage snapshotEvent = null;
if (lastSnapshotEvent != null) {
try {
snapshotEvent = new GenericDomainEventMessage(
identifier,
lastSnapshotEvent.getSequenceNumber(),
eventSerializer.deserialize(lastSnapshotEvent.getPayload()),
(Map) eventSerializer.deserialize(lastSnapshotEvent.getMetaData()));
snapshotSequenceNumber = snapshotEvent.getSequenceNumber();
} catch (RuntimeException ex) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
ex.getClass().getName(),
ex.getMessage());
} catch (LinkageError error) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
error.getClass().getName(),
error.getMessage());
}
}
List events = fetchBatch(type, identifier, snapshotSequenceNumber + 1);
if (snapshotEvent != null) {
events.add(0, snapshotEvent);
}
if (events.isEmpty()) {
throw new EventStreamNotFoundException(type, identifier);
}
return new BatchingDomainEventStream(events, identifier, type);
}
@SuppressWarnings({"unchecked"})
private List fetchBatch(String type, Object identifier, long firstSequenceNumber) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
List extends SerializedDomainEventData> entries = eventEntryStore.fetchBatch(type,
identifier,
firstSequenceNumber,
batchSize,
entityManager);
List events = new ArrayList(entries.size());
for (SerializedDomainEventData entry : entries) {
events.addAll(SerializedDomainEventMessage.createDomainEventMessages(eventSerializer,
entry.getEventIdentifier(),
identifier,
entry.getSequenceNumber(),
entry.getTimestamp(),
entry.getPayload(),
entry.getMetaData(),
upcasterChain));
}
return events;
}
/**
* {@inheritDoc}
*
* Upon appending a snapshot, this particular EventStore implementation also prunes snapshots which are considered
* redundant because they fall outside of the range of maximum snapshots to archive.
*/
@Override
public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
// Persist snapshot before pruning redundant archived ones, in order to prevent snapshot misses when reloading
// an aggregate, which may occur when a READ_UNCOMMITTED transaction isolation level is used.
eventEntryStore.persistSnapshot(type, snapshotEvent,
eventSerializer.serialize(snapshotEvent.getPayload(), byte[].class),
eventSerializer.serialize(snapshotEvent.getMetaData(), byte[].class),
entityManager);
if (maxSnapshotsArchived > 0) {
eventEntryStore.pruneSnapshots(type, snapshotEvent, maxSnapshotsArchived,
entityManagerProvider.getEntityManager());
}
}
@Override
public void visitEvents(EventVisitor visitor) {
doVisitEvents(visitor, null, Collections.emptyMap());
}
@Override
public void visitEvents(Criteria criteria, EventVisitor visitor) {
StringBuilder sb = new StringBuilder();
ParameterRegistry parameters = new ParameterRegistry();
((JpaCriteria) criteria).parse("e", sb, parameters);
doVisitEvents(visitor, sb.toString(), parameters.getParameters());
}
@Override
public CriteriaBuilder newCriteriaBuilder() {
return new JpaCriteriaBuilder();
}
private void doVisitEvents(EventVisitor visitor, String whereClause, Map parameters) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
// TODO Batch processing is duplicated for JpaEventStore and MongoEventStore
// provide a more generic way of doing batch processing.
int first = 0;
List extends SerializedDomainEventData> batch;
boolean shouldContinue = true;
while (shouldContinue) {
batch = eventEntryStore.fetchFilteredBatch(whereClause, parameters, first, batchSize, entityManager);
for (SerializedDomainEventData entry : batch) {
List domainEventMessages =
SerializedDomainEventMessage.createDomainEventMessages(entry,
eventSerializer,
eventSerializer,
upcasterChain);
for (DomainEventMessage domainEventMessage : domainEventMessages) {
visitor.doWithEvent(domainEventMessage);
}
}
shouldContinue = (batch.size() >= batchSize);
first += batchSize;
}
}
/**
* Registers the data source that allows the EventStore to detect the database type and define the error codes that
* represent concurrent access failures.
*
* Should not be used in combination with {@link #setPersistenceExceptionResolver(PersistenceExceptionResolver)},
* but rather as a shorthand alternative for most common database types.
*
* @param dataSource A data source providing access to the backing database
* @throws SQLException If an error occurs while accessing the dataSource
*/
public void setDataSource(DataSource dataSource) throws SQLException {
if (persistenceExceptionResolver == null) {
persistenceExceptionResolver = new SQLErrorCodesResolver(dataSource);
}
}
/**
* Sets the persistenceExceptionResolver that will help detect concurrency exceptions from the backing database.
*
* @param persistenceExceptionResolver the persistenceExceptionResolver that will help detect concurrency
* exceptions
*/
public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
this.persistenceExceptionResolver = persistenceExceptionResolver;
}
/**
* Sets the number of events that should be read at each database access. When more than this number of events must
* be read to rebuild an aggregate's state, the events are read in batches of this size. Defaults to 100.
*
* Tip: if you use a snapshotter, make sure to choose snapshot trigger and batch size such that a single batch will
* generally retrieve all events required to rebuild an aggregate's state.
*
* @param batchSize the number of events to read on each database access. Default to 100.
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* Sets the upcasters which allow older revisions of serialized objects to be deserialized. Upcasters are evaluated
* in the order they are provided in the given List. That means that you should take special precaution when an
* upcaster expects another upcaster to have processed an event.
*
* Any upcaster that relies on another upcaster doing its work first, should be placed after that other
* upcaster in the given list. Thus for any upcaster B that relies on upcaster A to do its work
* first, the following must be true: upcasters.indexOf(B) > upcasters.indexOf(A).
*
* @param upcasters the upcasters for this serializer.
*/
public void setUpcasters(List upcasters) {
this.upcasterChain = new UpcasterChain(upcasters);
}
/**
* Sets the maximum number of snapshots to archive for an aggregate. The EventStore will keep at most this number
* of
* snapshots per aggregate.
*
* Defaults to {@value #DEFAULT_MAX_SNAPSHOTS_ARCHIVED}.
*
* @param maxSnapshotsArchived The maximum number of snapshots to archive for an aggregate. A value less than 1
* disables pruning of snapshots.
*/
public void setMaxSnapshotsArchived(int maxSnapshotsArchived) {
this.maxSnapshotsArchived = maxSnapshotsArchived;
}
private final class BatchingDomainEventStream implements DomainEventStream {
private int currentBatchSize;
private Iterator currentBatch;
private DomainEventMessage next;
private final Object id;
private final String typeId;
private BatchingDomainEventStream(List firstBatch, Object id, String typeId) {
this.id = id;
this.typeId = typeId;
this.currentBatchSize = firstBatch.size();
this.currentBatch = firstBatch.iterator();
if (currentBatch.hasNext()) {
next = currentBatch.next();
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public DomainEventMessage next() {
DomainEventMessage current = next;
if (next != null && !currentBatch.hasNext() && currentBatchSize >= batchSize) {
logger.debug("Fetching new batch for Aggregate [{}]", id);
List newBatch = fetchBatch(typeId, id, next.getSequenceNumber() + 1);
currentBatchSize = newBatch.size();
currentBatch = newBatch.iterator();
}
next = currentBatch.hasNext() ? currentBatch.next() : null;
return current;
}
@Override
public DomainEventMessage peek() {
return next;
}
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.SerializedDomainEventData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.eventstore.SnapshotEventStore;
import org.axonframework.eventstore.jpa.criteria.JpaCriteria;
import org.axonframework.eventstore.jpa.criteria.JpaCriteriaBuilder;
import org.axonframework.eventstore.jpa.criteria.ParameterRegistry;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.axonframework.upcasting.Upcaster;
import org.axonframework.upcasting.UpcasterChain;
import org.axonframework.upcasting.UpcasterAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import static org.axonframework.common.IdentifierValidator.validateIdentifier;
/**
* An EventStore implementation that uses JPA to store DomainEvents in a database. The actual DomainEvent is stored as
* a
* serialized blob of bytes. Other columns are used to store meta-data that allow quick finding of DomainEvents for a
* specific aggregate in the correct order.
*
* This EventStore supports snapshots pruning, which can enabled by configuring a {@link #setMaxSnapshotsArchived(int)
* maximum number of snapshots to archive}. By default snapshot pruning is configured to archive only {@value
* #DEFAULT_MAX_SNAPSHOTS_ARCHIVED} snapshot per aggregate.
*
* The serializer used to serialize the events is configurable. By default, the {@link XStreamSerializer} is used.
*
* @author Allard Buijze
* @since 0.5
*/
public class JpaEventStore implements SnapshotEventStore, EventStoreManagement, UpcasterAware {
private static final Logger logger = LoggerFactory.getLogger(JpaEventStore.class);
private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_MAX_SNAPSHOTS_ARCHIVED = 1;
private final Serializer eventSerializer;
private final EventEntryStore eventEntryStore;
private int batchSize = DEFAULT_BATCH_SIZE;
private UpcasterChain upcasterChain = UpcasterChain.EMPTY;
private final EntityManagerProvider entityManagerProvider;
private int maxSnapshotsArchived = DEFAULT_MAX_SNAPSHOTS_ARCHIVED;
private PersistenceExceptionResolver persistenceExceptionResolver;
/**
* Initialize a JpaEventStore using an {@link org.axonframework.serializer.xml.XStreamSerializer}, which
* serializes events as XML and the default Event Entry store.
*
* The JPA Persistence context is required to contain two entities: {@link DomainEventEntry} and {@link
* SnapshotEventEntry}.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider) {
this(entityManagerProvider, new XStreamSerializer(), new DefaultEventEntryStore());
}
/**
eventSerializer.deserialize(lastSnapshotEvent.getPayload()),
* Initialize a JpaEventStore using the given eventEntryStore and an {@link
* org.axonframework.serializer.xml.XStreamSerializer}, which serializes events as XML.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventEntryStore The instance providing persistence logic for Domain Event entries
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider, EventEntryStore eventEntryStore) {
this(entityManagerProvider, new XStreamSerializer(), eventEntryStore);
}
/**
* Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the
* events in the database using the default EventEntryStore.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventSerializer The serializer to (de)serialize domain events with.
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider,
Serializer eventSerializer) {
this(entityManagerProvider, eventSerializer, new DefaultEventEntryStore());
}
/**
* Initialize a JpaEventStore which serializes events using the given eventSerializer and stores the
* events in the database using the given eventEntryStore.
*
* @param entityManagerProvider The EntityManagerProvider providing the EntityManager instance for this EventStore
* @param eventSerializer The serializer to (de)serialize domain events with.
* @param eventEntryStore The instance providing persistence logic for Domain Event entries
*/
public JpaEventStore(EntityManagerProvider entityManagerProvider,
Serializer eventSerializer, EventEntryStore eventEntryStore) {
this.entityManagerProvider = entityManagerProvider;
this.eventSerializer = eventSerializer;
this.eventEntryStore = eventEntryStore;
}
/**
* {@inheritDoc}
*/
@Override
public void appendEvents(String type, DomainEventStream events) {
DomainEventMessage event = null;
try {
EntityManager entityManager = entityManagerProvider.getEntityManager();
while (events.hasNext()) {
event = events.next();
validateIdentifier(event.getAggregateIdentifier().getClass());
eventEntryStore.persistEvent(type, event, eventSerializer.serialize(event.getPayload(), byte[].class),
eventSerializer.serialize(event.getMetaData(), byte[].class),
entityManager);
}
entityManager.flush();
} catch (RuntimeException exception) {
if (persistenceExceptionResolver != null
&& persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
throw new ConcurrencyException(
*/
@Override
String.format("Concurrent modification detected for Aggregate identifier [%s], sequence: [%s]",
event.getAggregateIdentifier(),
event.getSequenceNumber()),
exception);
}
throw exception;
}
}
/**
* {@inheritDoc}
*/
@SuppressWarnings({"unchecked"})
@Override
public DomainEventStream readEvents(String type, Object identifier) {
long snapshotSequenceNumber = -1;
EntityManager entityManager = entityManagerProvider.getEntityManager();
SerializedDomainEventData lastSnapshotEvent = eventEntryStore.loadLastSnapshotEvent(type, identifier,
entityManager);
DomainEventMessage snapshotEvent = null;
if (lastSnapshotEvent != null) {
try {
snapshotEvent = new GenericDomainEventMessage(
identifier,
lastSnapshotEvent.getSequenceNumber(),
(Map) eventSerializer.deserialize(lastSnapshotEvent.getMetaData()));
snapshotSequenceNumber = snapshotEvent.getSequenceNumber();
} catch (RuntimeException ex) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
ex.getClass().getName(),
ex.getMessage());
} catch (LinkageError error) {
logger.warn("Error while reading snapshot event entry. "
+ "Reconstructing aggregate on entire event stream. Caused by: {} {}",
error.getClass().getName(),
error.getMessage());
}
}
List events = fetchBatch(type, identifier, snapshotSequenceNumber + 1);
if (snapshotEvent != null) {
events.add(0, snapshotEvent);
}
if (events.isEmpty()) {
throw new EventStreamNotFoundException(type, identifier);
}
return new BatchingDomainEventStream(events, identifier, type);
}
@SuppressWarnings({"unchecked"})
private List fetchBatch(String type, Object identifier, long firstSequenceNumber) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
List extends SerializedDomainEventData> entries = eventEntryStore.fetchBatch(type,
identifier,
firstSequenceNumber,
batchSize,
entityManager);
List events = new ArrayList(entries.size());
for (SerializedDomainEventData entry : entries) {
events.addAll(SerializedDomainEventMessage.createDomainEventMessages(eventSerializer,
entry.getEventIdentifier(),
identifier,
entry.getSequenceNumber(),
entry.getTimestamp(),
entry.getPayload(),
entry.getMetaData(),
upcasterChain));
}
return events;
}
/**
* {@inheritDoc}
*
* Upon appending a snapshot, this particular EventStore implementation also prunes snapshots which are considered
* redundant because they fall outside of the range of maximum snapshots to archive.
public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
// Persist snapshot before pruning redundant archived ones, in order to prevent snapshot misses when reloading
// an aggregate, which may occur when a READ_UNCOMMITTED transaction isolation level is used.
eventEntryStore.persistSnapshot(type, snapshotEvent,
eventSerializer.serialize(snapshotEvent.getPayload(), byte[].class),
eventSerializer.serialize(snapshotEvent.getMetaData(), byte[].class),
entityManager);
if (maxSnapshotsArchived > 0) {
eventEntryStore.pruneSnapshots(type, snapshotEvent, maxSnapshotsArchived,
entityManagerProvider.getEntityManager());
}
}
@Override
public void visitEvents(EventVisitor visitor) {
doVisitEvents(visitor, null, Collections.emptyMap());
}
@Override
public void visitEvents(Criteria criteria, EventVisitor visitor) {
StringBuilder sb = new StringBuilder();
ParameterRegistry parameters = new ParameterRegistry();
((JpaCriteria) criteria).parse("e", sb, parameters);
doVisitEvents(visitor, sb.toString(), parameters.getParameters());
}
@Override
public CriteriaBuilder newCriteriaBuilder() {
return new JpaCriteriaBuilder();
}
private void doVisitEvents(EventVisitor visitor, String whereClause, Map parameters) {
EntityManager entityManager = entityManagerProvider.getEntityManager();
int first = 0;
List extends SerializedDomainEventData> batch;
boolean shouldContinue = true;
while (shouldContinue) {
batch = eventEntryStore.fetchFilteredBatch(whereClause, parameters, first, batchSize, entityManager);
for (SerializedDomainEventData entry : batch) {
List domainEventMessages =
SerializedDomainEventMessage.createDomainEventMessages(entry,
eventSerializer,
eventSerializer,
upcasterChain);
for (DomainEventMessage domainEventMessage : domainEventMessages) {
visitor.doWithEvent(domainEventMessage);
}
}
shouldContinue = (batch.size() >= batchSize);
first += batchSize;
}
}
/**
* Registers the data source that allows the EventStore to detect the database type and define the error codes that
* represent concurrent access failures.
*
* Should not be used in combination with {@link #setPersistenceExceptionResolver(PersistenceExceptionResolver)},
* but rather as a shorthand alternative for most common database types.
*
* @param dataSource A data source providing access to the backing database
* @throws SQLException If an error occurs while accessing the dataSource
*/
public void setDataSource(DataSource dataSource) throws SQLException {
if (persistenceExceptionResolver == null) {
persistenceExceptionResolver = new SQLErrorCodesResolver(dataSource);
}
}
/**
* Sets the persistenceExceptionResolver that will help detect concurrency exceptions from the backing database.
*
* @param persistenceExceptionResolver the persistenceExceptionResolver that will help detect concurrency
* exceptions
*/
public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
this.persistenceExceptionResolver = persistenceExceptionResolver;
}
/**
* Sets the number of events that should be read at each database access. When more than this number of events must
* be read to rebuild an aggregate's state, the events are read in batches of this size. Defaults to 100.
*
* Tip: if you use a snapshotter, make sure to choose snapshot trigger and batch size such that a single batch will
* generally retrieve all events required to rebuild an aggregate's state.
*
* @param batchSize the number of events to read on each database access. Default to 100.
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@Override
public void setUpcasters(List upcasters) {
this.upcasterChain = new UpcasterChain(upcasters);
}
/**
* Sets the maximum number of snapshots to archive for an aggregate. The EventStore will keep at most this number
* of
* snapshots per aggregate.
*
* Defaults to {@value #DEFAULT_MAX_SNAPSHOTS_ARCHIVED}.
*
* @param maxSnapshotsArchived The maximum number of snapshots to archive for an aggregate. A value less than 1
* disables pruning of snapshots.
*/
public void setMaxSnapshotsArchived(int maxSnapshotsArchived) {
this.maxSnapshotsArchived = maxSnapshotsArchived;
}
private final class BatchingDomainEventStream implements DomainEventStream {
private int currentBatchSize;
private Iterator currentBatch;
private DomainEventMessage next;
private final Object id;
private final String typeId;
private BatchingDomainEventStream(List firstBatch, Object id, String typeId) {
this.id = id;
this.typeId = typeId;
this.currentBatchSize = firstBatch.size();
this.currentBatch = firstBatch.iterator();
if (currentBatch.hasNext()) {
next = currentBatch.next();
}
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public DomainEventMessage next() {
DomainEventMessage current = next;
if (next != null && !currentBatch.hasNext() && currentBatchSize >= batchSize) {
logger.debug("Fetching new batch for Aggregate [{}]", id);
List newBatch = fetchBatch(typeId, id, next.getSequenceNumber() + 1);
currentBatchSize = newBatch.size();
currentBatch = newBatch.iterator();
}
next = currentBatch.hasNext() ? currentBatch.next() : null;
return current;
}
@Override
public DomainEventMessage peek() {
return next;
}
}
}
File
JpaEventStore.java
Developer's decision
Manual
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.saga.repository.jpa;
import org.axonframework.saga.Saga;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.Transient;
/**
* Java Persistence Entity allowing sagas to be stored in a relational database.
*
* @author Allard Buijze
* @since 0.7
*/
@Entity
public class SagaEntry {
@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
@Id
private String sagaId; // NOSONAR
@Basic
private String sagaType;
@Basic
private String revision;
@Lob
private byte[] serializedSaga;
@Transient
private transient Saga saga;
/**
* Constructs a new SagaEntry for the given saga. The given saga must be serializable. The provided
* saga is not modified by this operation.
*
* @param saga The saga to store
* @param serializer The serialization mechanism to convert the Saga to a byte stream
*/
public SagaEntry(Saga saga, Serializer serializer) {
this.sagaId = saga.getSagaIdentifier();
SerializedObject serialized = serializer.serialize(saga, byte[].class);
this.serializedSaga = serialized.getData();
this.sagaType = serialized.getType().getName();
this.revision = serialized.getType().getRevision();
this.saga = saga;
}
/**
* Returns the Saga instance stored in this entry.
*
* @param serializer The serializer to decode the Saga
* @return the Saga instance stored in this entry
*/
public Saga getSaga(Serializer serializer) {
if (saga != null) {
return saga;
}
return (Saga) serializer.deserialize(new SimpleSerializedObject(serializedSaga, byte[].class,
sagaType, revision));
}
/**
* Constructor required by JPA. Do not use.
*
* @see #SagaEntry(org.axonframework.saga.Saga, org.axonframework.serializer.Serializer)
*/
protected SagaEntry() {
// required by JPA
}
/**
* Returns the serialized form of the Saga.
*
* @return the serialized form of the Saga
*/
public byte[] getSerializedSaga() {
return serializedSaga; //NOSONAR
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.saga.repository.jpa;
import org.axonframework.saga.Saga;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Lob;
/**
* Java Persistence Entity allowing sagas to be stored in a relational database.
*
* @author Allard Buijze
* @since 0.7
*/
@Entity
public class SagaEntry {
@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
@Id
private String sagaId; // NOSONAR
@Basic
private String sagaType;
@Basic
private int revision;
@Lob
private byte[] serializedSaga;
/**
* Constructs a new SagaEntry for the given saga. The given saga must be serializable. The provided
* saga is not modified by this operation.
*
* @param saga The saga to store
* @param serializer The serialization mechanism to convert the Saga to a byte stream
*/
public SagaEntry(Saga saga, Serializer serializer) {
this.sagaId = saga.getSagaIdentifier();
SerializedObject serialized = serializer.serialize(saga, byte[].class);
this.serializedSaga = serialized.getData();
this.sagaType = serialized.getType().getName();
this.revision = serialized.getType().getRevision();
}
/**
* Returns the Saga instance stored in this entry.
*
* @param serializer The serializer to decode the Saga
* @return the Saga instance stored in this entry
*/
public Saga getSaga(Serializer serializer) {
return (Saga) serializer.deserialize(new SimpleSerializedObject(serializedSaga, byte[].class,
sagaType, revision));
}
/**
* Constructor required by JPA. Do not use.
*
* @see #SagaEntry(org.axonframework.saga.Saga, org.axonframework.serializer.Serializer)
*/
protected SagaEntry() {
// required by JPA
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.saga.repository.jpa;
import org.axonframework.saga.Saga;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.Transient;
/**
* Java Persistence Entity allowing sagas to be stored in a relational database.
*
* @author Allard Buijze
* @since 0.7
*/
@Entity
public class SagaEntry {
@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
@Id
private String sagaId; // NOSONAR
@Basic
private String sagaType;
@Basic
private String revision;
@Lob
private byte[] serializedSaga;
@Transient
private transient Saga saga;
/**
* Constructs a new SagaEntry for the given saga. The given saga must be serializable. The provided
* saga is not modified by this operation.
*
* @param saga The saga to store
* @param serializer The serialization mechanism to convert the Saga to a byte stream
*/
public SagaEntry(Saga saga, Serializer serializer) {
this.sagaId = saga.getSagaIdentifier();
SerializedObject serialized = serializer.serialize(saga, byte[].class);
this.serializedSaga = serialized.getData();
this.sagaType = serialized.getType().getName();
this.revision = serialized.getType().getRevision();
this.saga = saga;
}
/**
* Returns the Saga instance stored in this entry.
*
* @param serializer The serializer to decode the Saga
* @return the Saga instance stored in this entry
*/
public Saga getSaga(Serializer serializer) {
if (saga != null) {
return saga;
}
return (Saga) serializer.deserialize(new SimpleSerializedObject(serializedSaga, byte[].class,
sagaType, revision));
}
/**
* Constructor required by JPA. Do not use.
*
* @see #SagaEntry(org.axonframework.saga.Saga, org.axonframework.serializer.Serializer)
*/
protected SagaEntry() {
// required by JPA
}
/**
* Returns the serialized form of the Saga.
*
* @return the serialized form of the Saga
*/
public byte[] getSerializedSaga() {
return serializedSaga; //NOSONAR
}
}
File
SagaEntry.java
Developer's decision
Version 1
Kind of conflict
Annotation
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
package org.axonframework.serializer;
import com.thoughtworks.xstream.XStream;
import java.util.UUID;
/**
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
* Abstract implementation for XStream based serializers. It provides some helper methods and configuration features
* independent of the actual format used to marshal to.
*
* @author Allard Buijze
* @since 2.0
*/
public abstract class AbstractXStreamSerializer implements Serializer {
=======
* Serializer that uses XStream to serialize and deserialize arbitrary objects. The XStream instance is configured to
* deal with the Classes used in Axon Framework in the most compact fashion.
*
* When running on a Sun JVM, XStream does not pose any restrictions on classes to serialize. On other JVM's, however,
* you need to either implement Serializable, or provide a default constructor (accessible under the JVM's security
* policy). That means that for portability, you should do either of these two.
*
* @author Allard Buijze
* @see com.thoughtworks.xstream.XStream
* @since 1.2
*/
public class XStreamSerializer implements Serializer {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
private static final Charset DEFAULT_CHARSET_NAME = Charset.forName("UTF-8");
private final XStream xStream;
Solution content
import java.util.UUID;
/**
* Abstract implementation for XStream based serializers. It provides some helper methods and configuration features
* independent of the actual format used to marshal to.
*
* @author Allard Buijze
* @since 2.0
*/
public abstract class AbstractXStreamSerializer implements Serializer {
private static final Charset DEFAULT_CHARSET_NAME = Charset.forName("UTF-8");
private final XStream xStream;
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Class signature
Comment
Chunk
Conflicting content
private static final Charset DEFAULT_CHARSET_NAME = Charset.forName("UTF-8");
private final XStream xStream;
private final Charset charset;
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
private volatile UpcasterChain upcasters;
private ConverterFactory converterFactory;
/**
* Initialize a generic serializer using the UTF-8 character set and a default XStream instance.
*/
protected AbstractXStreamSerializer() {
=======
private ConverterFactory converterFactory;
/**
* Initialize a generic serializer using the UTF-8 character set. A default XStream instance (with {@link
* com.thoughtworks.xstream.io.xml.XppDriver}) is used to perform the serialization.
*/
public XStreamSerializer() {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
this(DEFAULT_CHARSET_NAME);
}
Solution content
private static final Charset DEFAULT_CHARSET_NAME = Charset.forName("UTF-8");
private final XStream xStream;
private final Charset charset;
private ConverterFactory converterFactory;
/**
* Initialize a generic serializer using the UTF-8 character set and a default XStream instance.
*/
protected AbstractXStreamSerializer() {
this(DEFAULT_CHARSET_NAME);
}
File
AbstractXStreamSerializer.java
Developer's decision
Combination
Kind of conflict
Attribute
Comment
Method signature
Chunk
Conflicting content
*
* @param xStream XStream instance to use
*/
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
protected AbstractXStreamSerializer(XStream xStream) {
=======
public XStreamSerializer(XStream xStream) {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
this(DEFAULT_CHARSET_NAME, xStream);
}
Solution content
*
* @param xStream XStream instance to use
*/
protected AbstractXStreamSerializer(XStream xStream) {
this(DEFAULT_CHARSET_NAME, xStream);
}
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Method signature
Chunk
Conflicting content
*
* @param charset The character set to use
*/
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
public AbstractXStreamSerializer(Charset charset) {
=======
public XStreamSerializer(Charset charset) {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
this(charset, new XStream());
}
Solution content
*
* @param charset The character set to use
*/
public AbstractXStreamSerializer(Charset charset) {
this(charset, new XStream());
}
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Method signature
Chunk
Conflicting content
* @param charset The character set to use
* @param xStream The XStream instance to use
*/
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
public AbstractXStreamSerializer(Charset charset, XStream xStream) {
=======
public XStreamSerializer(Charset charset, XStream xStream) {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
this(charset, xStream, new ChainingConverterFactory());
}
Solution content
* @param charset The character set to use
* @param xStream The XStream instance to use
*/
public AbstractXStreamSerializer(Charset charset, XStream xStream) {
this(charset, xStream, new ChainingConverterFactory());
}
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Method signature
Chunk
Conflicting content
}
/**
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
* Registers any converters that are specific to the type of content written by this serializer.
*
* @param converterFactory the ConverterFactory to register the converters with
*/
protected abstract void registerConverters(ChainingConverterFactory converterFactory);
/**
* Initialize the serializer using the given charset, xStream instance and
* converterFactory. The xStream instance is configured with several converters for the
* most common types in Axon.
*
* @param charset The character set to use
* @param xStream The XStream instance to use
* @param converterFactory The ConverterFactory providing the necessary content converters
*/
public AbstractXStreamSerializer(Charset charset, XStream xStream, ConverterFactory converterFactory) {
this.charset = charset;
this.xStream = xStream;
this.converterFactory = converterFactory;
if (converterFactory instanceof ChainingConverterFactory) {
registerConverters((ChainingConverterFactory) converterFactory);
}
=======
* Initialize the serializer using the given charset and xStream instance. The given
* converterFactory is used to convert serialized objects for use by Upcasters. The
* xStream instance is configured with several converters for the most common types in Axon.
*
* @param charset The character set to use
* @param xStream The XStream instance to use
* @param converterFactory The factory providing the converter instances for upcasters
*/
public XStreamSerializer(Charset charset, XStream xStream, ConverterFactory converterFactory) {
this.charset = charset;
this.xStream = xStream;
this.converterFactory = converterFactory;
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
xStream.registerConverter(new JodaTimeConverter());
xStream.addImmutableType(UUID.class);
xStream.aliasPackage("axon.domain", "org.axonframework.domain");
Solution content
}
/**
* Registers any converters that are specific to the type of content written by this serializer.
*
* @param converterFactory the ConverterFactory to register the converters with
*/
protected abstract void registerConverters(ChainingConverterFactory converterFactory);
/**
* Initialize the serializer using the given charset, xStream instance and
* converterFactory. The xStream instance is configured with several converters for the
* most common types in Axon.
*
* @param charset The character set to use
* @param xStream The XStream instance to use
* @param converterFactory The ConverterFactory providing the necessary content converters
*/
public AbstractXStreamSerializer(Charset charset, XStream xStream, ConverterFactory converterFactory) {
this.charset = charset;
this.xStream = xStream;
this.converterFactory = converterFactory;
if (converterFactory instanceof ChainingConverterFactory) {
registerConverters((ChainingConverterFactory) converterFactory);
}
xStream.registerConverter(new JodaTimeConverter());
xStream.addImmutableType(UUID.class);
xStream.aliasPackage("axon.domain", "org.axonframework.domain");
*/
@Override
public SerializedObject serialize(Object object, Class expectedType) {
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
T result = doSerialize(object, expectedType, xStream);
return new SimpleSerializedObject(result, expectedType, typeIdentifierOf(object.getClass()),
=======
ByteArrayOutputStream baos = new ByteArrayOutputStream();
xStream.marshal(object, new CompactWriter(new OutputStreamWriter(baos, charset)));
T converted = converterFactory.getConverter(byte[].class, expectedType).convert(baos.toByteArray());
return new SimpleSerializedObject(converted, expectedType, typeIdentifierOf(object.getClass()),
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
revisionOf(object.getClass()));
}
Solution content
*/
@Override
public SerializedObject serialize(Object object, Class expectedType) {
T result = doSerialize(object, expectedType, xStream);
return new SimpleSerializedObject(result, expectedType, typeIdentifierOf(object.getClass()),
revisionOf(object.getClass()));
}
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Method invocation
Return statement
Variable
Chunk
Conflicting content
}
/**
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
* {@inheritDoc}
*/
@SuppressWarnings({"unchecked"})
@Override
public Object deserialize(SerializedObject serializedObject) {
UpcasterChain currentUpcasterChain = upcasters; // create copy for concurrency reasons
SerializedObject current = serializedObject;
if (currentUpcasterChain != null) {
current = currentUpcasterChain.upcast(current);
}
return doDeserialize(current, xStream);
}
/**
* Serialize the given object to the given expectedFormat. The subclass may use {@link
* #convert(Class, Class, Object)} to convert the result of the serialization to the expected type.
*
* @param object The object to serialize
* @param expectedFormat The format in which the serialized object must be returned
* @param xStream The XStream instance to serialize with
* @param The format in which the serialized object must be returned
* @return The serialized object
*/
protected abstract T doSerialize(Object object, Class expectedFormat, XStream xStream);
/**
* Deserialize the given serializedObject.
*
* @param serializedObject The instance containing the serialized format of the object
* @param xStream The XStream instance to deserialize with
* @return the deserialized object
*/
protected abstract Object doDeserialize(SerializedObject serializedObject, XStream xStream);
/**
* Convert the given source, of type sourceType to the given targetType.
*
* @param sourceType The type of data that needs to be converted. Should be a content type identifier, not
* necessarily the result of source.getClass().
* @param targetType The target type of the conversion
* @param source The object to convert
* @param The type of data that needs to be converted
* @param The target type of the conversion
* @return The converted object
*/
protected T convert(Class sourceType, Class targetType, S source) {
return getConverterFactory().getConverter(sourceType, targetType).convert(source);
}
/**
=======
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
* Returns the revision number for the given type. The default implementation checks for an {@link
* Revision @Revision} annotation, and returns 0 if none was found. This method can be safely
* overridden by subclasses.
Solution content
}
/**
* Serialize the given object to the given expectedFormat. The subclass may use {@link
* #convert(Class, Class, Object)} to convert the result of the serialization to the expected type.
*
* @param object The object to serialize
* @param expectedFormat The format in which the serialized object must be returned
* @param xStream The XStream instance to serialize with
* @param The format in which the serialized object must be returned
* @return The serialized object
*/
protected abstract T doSerialize(Object object, Class expectedFormat, XStream xStream);
/**
* Deserialize the given serializedObject.
*
* @param serializedObject The instance containing the serialized format of the object
* @param xStream The XStream instance to deserialize with
* @return the deserialized object
*/
protected abstract Object doDeserialize(SerializedObject serializedObject, XStream xStream);
/**
* Convert the given source, of type sourceType to the given targetType.
*
* @param sourceType The type of data that needs to be converted. Should be a content type identifier, not
* necessarily the result of source.getClass().
* @param targetType The target type of the conversion
* @param source The object to convert
* @param The type of data that needs to be converted
* @param The target type of the conversion
* @return The converted object
*/
protected T convert(Class sourceType, Class targetType, S source) {
return getConverterFactory().getConverter(sourceType, targetType).convert(source);
}
/**
* Returns the revision number for the given type. The default implementation checks for an {@link
* Revision @Revision} annotation, and returns 0 if none was found. This method can be safely
* overridden by subclasses.
File
AbstractXStreamSerializer.java
Developer's decision
Combination
Kind of conflict
Annotation
Comment
Method declaration
Method interface
Chunk
Conflicting content
=======
* @param type The type for which to return the revision number
* @return the revision number for the given type
*/
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
protected String revisionOf(Class> type) {
Revision revision = type.getAnnotation(Revision.class);
return revision == null ? null : revision.value();
protected int revisionOf(Class> type) {
Revision revision = type.getAnnotation(Revision.class);
return revision == null ? 0 : revision.value();
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
}
/**
Solution content
* @param type The type for which to return the revision number
* @return the revision number for the given type
*/
protected String revisionOf(Class> type) {
Revision revision = type.getAnnotation(Revision.class);
return revision == null ? null : revision.value();
}
/**
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Method invocation
Method signature
Return statement
Variable
Chunk
Conflicting content
/**
* {@inheritDoc}
*/
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
@Override
public Class classForType(SerializedType type) {
UpcasterChain currentUpcasterChain = upcasters; // create copy for concurrency reasons
if (currentUpcasterChain != null) {
type = currentUpcasterChain.upcast(type);
}
=======
@SuppressWarnings({"unchecked"})
@Override
public Object deserialize(SerializedObject serializedObject) {
if ("org.dom4j.Document".equals(serializedObject.getContentType().getName())) {
return xStream.unmarshal(new Dom4JReader((Document) serializedObject.getData()));
} else {
SerializedObject convertedSerializedObject =
converterFactory.getConverter(serializedObject.getContentType(),
InputStream.class)
.convert(serializedObject);
return xStream.fromXML(new InputStreamReader(convertedSerializedObject.getData(), charset));
}
}
/**
* {@inheritDoc}
*/
@Override
public Class classForType(SerializedType type) {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
return xStream.getMapper().realClass(type.getName());
}
Solution content
/**
* {@inheritDoc}
*/
@Override
public Object deserialize(SerializedObject serializedObject) {
return doDeserialize(serializedObject, xStream);
}
/**
* {@inheritDoc}
*/
@Override
public Class classForType(SerializedType type) {
return xStream.getMapper().realClass(type.getName());
}
File
AbstractXStreamSerializer.java
Developer's decision
Manual
Kind of conflict
Annotation
Attribute
Comment
If statement
Method declaration
Method signature
Variable
Chunk
Conflicting content
return converterFactory;
}
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
/**
* Returns the type identifier for the given type. It uses the aliasing rules configured in XStream.
*
* @param type The type to get the type identifier of
* @return A String containing the type identifier of the given class
*/
protected String typeIdentifierOf(Class> type) {
return xStream.getMapper().serializedClass(type);
}
/**
* Sets the upcasters which allow older revisions of serialized objects to be deserialized. Upcasters are evaluated
* in the order they are provided in the given List. That means that you should take special precaution when an
* upcaster expects another upcaster to have processed an event.
*
* Any upcaster that relies on another upcaster doing its work first, should be placed after that other
* upcaster in the given list. Thus for any upcaster B that relies on upcaster A to do its work
* first, the following must be true: upcasters.indexOf(B) > upcasters.indexOf(A).
*
* @param upcasters the upcasters for this serializer.
*/
public void setUpcasters(List upcasters) {
this.upcasters = new UpcasterChain(converterFactory, upcasters);
}
=======
private String typeIdentifierOf(Class> type) {
return xStream.getMapper().serializedClass(type);
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
/**
* XStream Converter to serialize DateTime classes as a String.
Solution content
return converterFactory;
}
/**
* Returns the type identifier for the given type. It uses the aliasing rules configured in XStream.
*
* @param type The type to get the type identifier of
* @return A String containing the type identifier of the given class
*/
protected String typeIdentifierOf(Class> type) {
return xStream.getMapper().serializedClass(type);
}
/**
* XStream Converter to serialize DateTime classes as a String.
File
AbstractXStreamSerializer.java
Developer's decision
Combination
Kind of conflict
Comment
Method declaration
Chunk
Conflicting content
try {
Constructor constructor = context.getRequiredType().getConstructor(Object.class);
return constructor.newInstance(reader.getValue());
<<<<<<< HEAD:core/src/main/java/org/axonframework/serializer/AbstractXStreamSerializer.java
} catch (Exception e) { // NOSONAR
=======
} catch (Exception e) {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/main/java/org/axonframework/serializer/XStreamSerializer.java
throw new SerializationException(String.format(
"An exception occurred while deserializing a Joda Time object: %s",
context.getRequiredType().getSimpleName()), e);
Solution content
try {
Constructor constructor = context.getRequiredType().getConstructor(Object.class);
return constructor.newInstance(reader.getValue());
} catch (Exception e) { // NOSONAR
throw new SerializationException(String.format(
"An exception occurred while deserializing a Joda Time object: %s",
context.getRequiredType().getSimpleName()), e);
File
AbstractXStreamSerializer.java
Developer's decision
Version 1
Kind of conflict
Catch clause
Comment
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.serializer;
import org.axonframework.common.SerializationException;
import org.axonframework.common.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* Serializer implementation that uses Java serialization to serialize and deserialize object instances. This
* implementation is very suitable if the life span of the serialized objects allows classes to remain unchanged. If
* Class definitions need to be changed during the object's life cycle, another implementation, like the
* {@link org.axonframework.serializer.xml.XStreamSerializer} might be a more suitable alternative.
*
* @author Allard Buijze
* @since 2.0
*/
public class JavaSerializer implements Serializer {
private static final Logger logger = LoggerFactory.getLogger(JavaSerializer.class);
private final ConverterFactory converterFactory = new ChainingConverterFactory();
@Override
public SerializedObject serialize(Object instance, Class expectedType) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(instance);
} finally {
oos.flush();
}
} catch (IOException e) {
throw new SerializationException("An exception occurred writing serialized data to the output stream", e);
}
new SimpleSerializedType(instance.getClass().getName(), revisionOf(instance.getClass()));
T converted = converterFactory.getConverter(byte[].class, expectedType)
.convert(baos.toByteArray());
return new SimpleSerializedObject(converted, expectedType, instance.getClass().getName(),
revisionOf(instance.getClass()));
}
@Override
public boolean canSerializeTo(Class expectedRepresentation) {
return (converterFactory.hasConverter(byte[].class, expectedRepresentation));
}
@Override
public Object deserialize(SerializedObject serializedObject) {
SerializedObject converted = converterFactory.getConverter(serializedObject.getContentType(),
InputStream.class)
.convert(serializedObject);
InputStream stream = converted.getData();
try {
ObjectInputStream ois = new ObjectInputStream(stream);
return ois.readObject();
} catch (ClassNotFoundException e) {
throw new SerializationException("An error occurred while deserializing: " + e.getMessage(), e);
} catch (IOException e) {
throw new SerializationException("The theoretically impossible has just happened: "
+ "An IOException while reading to a ByteArrayInputStream.", e);
} finally {
IOUtils.closeQuietly(stream);
}
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
logger.warn("Could not load class for serialized type [{}] revision {}",
type.getName(), type.getRevision());
return null;
}
}
/**
* Returns the revision number for the given type. The default implementation checks for an {@link
* Revision @Revision} annotation, and returns 0 if none was found. This method can be safely
* overridden by subclasses.
*
* The revision number is used by upcasters to decide whether they need to process a certain serialized event.
* Generally, the revision number needs to be increased each time the structure of an event has been changed in an
* incompatible manner.
*
* @param type The type for which to return the revision number
* @return the revision number for the given type
*/
protected String revisionOf(Class> type) {
Revision revision = type.getAnnotation(Revision.class);
return revision == null ? null : revision.value();
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.serializer;
import org.axonframework.common.SerializationException;
import org.axonframework.common.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* Serializer implementation that uses Java serialization to serialize and deserialize object instances. This
* implementation is very suitable if the life span of the serialized objects allows classes to remain unchanged. If
* Class definitions need to be changed during the object's life cycle, another implementation, like the
* {@link XStreamSerializer} might be a more suitable alternative.
*
* @author Allard Buijze
* @since 2.0
*/
public class JavaSerializer implements Serializer {
private static final Logger logger = LoggerFactory.getLogger(JavaSerializer.class);
private final ConverterFactory converterFactory = new ChainingConverterFactory();
@Override
public SerializedObject serialize(Object instance, Class expectedType) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(instance);
} finally {
oos.flush();
}
} catch (IOException e) {
throw new SerializationException("An exception occurred writing serialized data to the output stream", e);
}
new SimpleSerializedType(instance.getClass().getName(), revisionOf(instance.getClass()));
T converted = converterFactory.getConverter(byte[].class, expectedType)
.convert(baos.toByteArray());
return new SimpleSerializedObject(converted, expectedType, instance.getClass().getName(),
revisionOf(instance.getClass()));
}
@Override
public Object deserialize(SerializedObject serializedObject) {
SerializedObject converted = converterFactory.getConverter(serializedObject.getContentType(),
InputStream.class)
.convert(serializedObject);
InputStream stream = converted.getData();
try {
ObjectInputStream ois = new ObjectInputStream(stream);
return ois.readObject();
} catch (ClassNotFoundException e) {
throw new SerializationException("An error occurred while deserializing: " + e.getMessage(), e);
} catch (IOException e) {
throw new SerializationException("The theoretically impossible has just happened: "
+ "An IOException while reading to a ByteArrayInputStream.", e);
} finally {
IOUtils.closeQuietly(stream);
}
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
logger.warn("Could not load class for serialized type [{}] revision {}",
type.getName(), type.getRevision());
return null;
}
}
/**
* Returns the revision number for the given type. The default implementation checks for an {@link
* Revision @Revision} annotation, and returns 0 if none was found. This method can be safely
* overridden by subclasses.
*
* The revision number is used by upcasters to decide whether they need to process a certain serialized event.
* Generally, the revision number needs to be increased each time the structure of an event has been changed in an
* incompatible manner.
*
* @param type The type for which to return the revision number
* @return the revision number for the given type
*/
protected int revisionOf(Class> type) {
Revision revision = type.getAnnotation(Revision.class);
return revision == null ? 0 : revision.value();
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.serializer;
import org.axonframework.common.SerializationException;
import org.axonframework.common.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* Serializer implementation that uses Java serialization to serialize and deserialize object instances. This
* implementation is very suitable if the life span of the serialized objects allows classes to remain unchanged. If
* Class definitions need to be changed during the object's life cycle, another implementation, like the
* {@link org.axonframework.serializer.xml.XStreamSerializer} might be a more suitable alternative.
*
* @author Allard Buijze
* @since 2.0
*/
public class JavaSerializer implements Serializer {
private static final Logger logger = LoggerFactory.getLogger(JavaSerializer.class);
private final ConverterFactory converterFactory = new ChainingConverterFactory();
@Override
public SerializedObject serialize(Object instance, Class expectedType) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(instance);
} finally {
oos.flush();
}
} catch (IOException e) {
throw new SerializationException("An exception occurred writing serialized data to the output stream", e);
}
new SimpleSerializedType(instance.getClass().getName(), revisionOf(instance.getClass()));
T converted = converterFactory.getConverter(byte[].class, expectedType)
.convert(baos.toByteArray());
return new SimpleSerializedObject(converted, expectedType, instance.getClass().getName(),
revisionOf(instance.getClass()));
}
@Override
public boolean canSerializeTo(Class expectedRepresentation) {
return (converterFactory.hasConverter(byte[].class, expectedRepresentation));
}
@Override
public Object deserialize(SerializedObject serializedObject) {
SerializedObject converted = converterFactory.getConverter(serializedObject.getContentType(),
InputStream.class)
.convert(serializedObject);
InputStream stream = converted.getData();
try {
ObjectInputStream ois = new ObjectInputStream(stream);
return ois.readObject();
} catch (ClassNotFoundException e) {
throw new SerializationException("An error occurred while deserializing: " + e.getMessage(), e);
} catch (IOException e) {
throw new SerializationException("The theoretically impossible has just happened: "
+ "An IOException while reading to a ByteArrayInputStream.", e);
} finally {
IOUtils.closeQuietly(stream);
}
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
logger.warn("Could not load class for serialized type [{}] revision {}",
type.getName(), type.getRevision());
return null;
}
}
/**
* Returns the revision number for the given type. The default implementation checks for an {@link
* Revision @Revision} annotation, and returns 0 if none was found. This method can be safely
* overridden by subclasses.
*
* The revision number is used by upcasters to decide whether they need to process a certain serialized event.
* Generally, the revision number needs to be increased each time the structure of an event has been changed in an
* incompatible manner.
*
* @param type The type for which to return the revision number
* @return the revision number for the given type
*/
protected String revisionOf(Class> type) {
Revision revision = type.getAnnotation(Revision.class);
return revision == null ? null : revision.value();
}
}
File
JavaSerializer.java
Developer's decision
Version 1
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.serializer;
/**
* Interface describing a serialization mechanism. Implementations can serialize objects of given type T
* to an output stream and read the object back in from an input stream.
*
* @author Allard Buijze
* @since 1.2
*/
public interface Serializer {
/**
* Serialize the given object into a Serialized Object containing the given
* expectedRepresentation.
*
* Use {@link #canSerializeTo(Class)} to detect whether the expectedRepresentation is supported by
* this serializer.
*
* @param object The object to serialize
* @param expectedRepresentation The expected data type representing the serialized object
* @param The expected data type representing the serialized object
* @return the instance representing the serialized object.
*/
SerializedObject serialize(Object object, Class expectedRepresentation);
/**
* Indicates whether this Serializer is capable of serializing to the given expectedRepresentation.
*
* When true, this does *not* guarantee that the serialization and (optional) conversion will also
* succeed when executed. For example, when a serializer produces a byte[] containing JSON, trying to
* convert to a Dom4J Document will fail, even though this serializer has a converter to convert
* byte[]
* to Dom4J instances.
*
* @param expectedRepresentation The type of data a Serialized Object should contain
* @param The type of data a Serialized Object should contain
* @return true if the expectedRepresentation is supported, otherwise false.
*/
boolean canSerializeTo(Class expectedRepresentation);
/**
* Deserializes the first object read from the given bytes. The bytes are not consumed
* from the array or modified in any way. The resulting object instance is cast to the expected type.
*
* @param serializedObject the instance describing the type of object and the bytes providing the serialized data
* @param The data type of the serialized object
* @return the serialized object, cast to the expected type
*
* @throws ClassCastException if the first object in the stream is not an instance of <T>.
*/
Object deserialize(SerializedObject serializedObject);
/**
* Returns the class for the given type identifier. The result of this method must guarantee that the deserialized
* SerializedObject with the given type is an instance of the returned Class.
*
* @param type The type identifier of the object
* @return the Class representing the type of the serialized Object.
*/
Class classForType(SerializedType type);
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.serializer;
/**
* Interface describing a serialization mechanism. Implementations can serialize objects of given type T
* to an output stream and read the object back in from an input stream.
*
* @author Allard Buijze
* @since 1.2
*/
public interface Serializer {
/**
* Serialize the given object into a byte[].
*
* @param object The object to serialize
* @param expectedRepresentation The expected data type representing the serialized object
* @param The expected data type representing the serialized object
* @return the instance representing the serialized object.
*/
SerializedObject serialize(Object object, Class expectedRepresentation);
/**
* Deserializes the first object read from the given bytes. The bytes are not consumed
* from the array or modified in any way. The resulting object instance is cast to the expected type.
*
* @param serializedObject the instance describing the type of object and the bytes providing the serialized data
* @param The data type of the serialized object
* @return the serialized object, cast to the expected type
*
* @throws ClassCastException if the first object in the stream is not an instance of <T>.
*/
Object deserialize(SerializedObject serializedObject);
/**
* Returns the class for the given type identifier. The result of this method must guarantee that the deserialized
* SerializedObject with the given type is an instance of the returned Class.
*
* @param type The type identifier of the object
* @return the Class representing the type of the serialized Object.
*/
Class classForType(SerializedType type);
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.serializer;
/**
* Interface describing a serialization mechanism. Implementations can serialize objects of given type T
* to an output stream and read the object back in from an input stream.
*
* @author Allard Buijze
* @since 1.2
*/
public interface Serializer {
/**
* Serialize the given object into a Serialized Object containing the given
* expectedRepresentation.
*
* Use {@link #canSerializeTo(Class)} to detect whether the expectedRepresentation is supported by
* this serializer.
*
* @param object The object to serialize
* @param expectedRepresentation The expected data type representing the serialized object
* @param The expected data type representing the serialized object
* @return the instance representing the serialized object.
*/
SerializedObject serialize(Object object, Class expectedRepresentation);
/**
* Indicates whether this Serializer is capable of serializing to the given expectedRepresentation.
*
* When true, this does *not* guarantee that the serialization and (optional) conversion will also
* succeed when executed. For example, when a serializer produces a byte[] containing JSON, trying to
* convert to a Dom4J Document will fail, even though this serializer has a converter to convert
* byte[]
* to Dom4J instances.
*
* @param expectedRepresentation The type of data a Serialized Object should contain
* @param The type of data a Serialized Object should contain
* @return true if the expectedRepresentation is supported, otherwise false.
*/
boolean canSerializeTo(Class expectedRepresentation);
/**
* Deserializes the first object read from the given bytes. The bytes are not consumed
* from the array or modified in any way. The resulting object instance is cast to the expected type.
*
* @param serializedObject the instance describing the type of object and the bytes providing the serialized data
* @param The data type of the serialized object
* @return the serialized object, cast to the expected type
*
* @throws ClassCastException if the first object in the stream is not an instance of <T>.
*/
Object deserialize(SerializedObject serializedObject);
/**
* Returns the class for the given type identifier. The result of this method must guarantee that the deserialized
* SerializedObject with the given type is an instance of the returned Class.
*
* @param type The type identifier of the object
* @return the Class representing the type of the serialized Object.
*/
Class classForType(SerializedType type);
}
File
Serializer.java
Developer's decision
Version 1
Kind of conflict
Comment
Interface declaration
Package declaration
Chunk
Conflicting content
@Test
@Test
}
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.domain;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class AbstractAggregateRootTest {
private AggregateRoot testSubject;
@Before
public void setUp() {
testSubject = new AggregateRoot();
}
@Test
public void testSerializability_GenericXStreamSerializer() throws IOException {
XStreamSerializer serializer = new XStreamSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(serializer.serialize(testSubject, byte[].class).getData());
assertEquals(0, deserialized(baos).getUncommittedEventCount());
assertFalse(deserialized(baos).getUncommittedEvents().hasNext());
assertNotNull(deserialized(baos).getIdentifier());
AggregateRoot deserialized = deserialized(baos);
deserialized.doSomething();
assertEquals(1, deserialized.getUncommittedEventCount());
assertNotNull(deserialized.getUncommittedEvents().next());
AggregateRoot deserialized2 = deserialized(baos);
deserialized2.doSomething();
assertNotNull(deserialized2.getUncommittedEvents().next());
assertEquals(1, deserialized2.getUncommittedEventCount());
}
private AggregateRoot deserialized(ByteArrayOutputStream baos) {
XStreamSerializer serializer = new XStreamSerializer();
return (AggregateRoot) serializer.deserialize(new SimpleSerializedObject(baos.toByteArray(),
byte[].class,
"ignored",
"0"));
}
@Test
public void testRegisterEvent() {
assertEquals(0, testSubject.getUncommittedEventCount());
testSubject.doSomething();
assertEquals(1, testSubject.getUncommittedEventCount());
}
public void testReadEventStreamDuringEventCommit() {
testSubject.doSomething();
testSubject.doSomething();
DomainEventStream uncommittedEvents = testSubject.getUncommittedEvents();
uncommittedEvents.next();
testSubject.commitEvents();
assertTrue(uncommittedEvents.hasNext());
assertNotNull(uncommittedEvents.next());
assertFalse(uncommittedEvents.hasNext());
}
private static class AggregateRoot extends AbstractAggregateRoot {
private final Object identifier;
private AggregateRoot() {
identifier = IdentifierFactory.getInstance().generateIdentifier();
}
@Override
public Object getIdentifier() {
return identifier;
}
public void doSomething() {
registerEvent(new StubDomainEvent());
}
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.domain;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.serializer.XStreamSerializer;
import org.junit.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class AbstractAggregateRootTest {
private AggregateRoot testSubject;
@Before
public void setUp() {
testSubject = new AggregateRoot();
}
@Test
public void testSerializability_GenericXStreamSerializer() throws IOException {
XStreamSerializer serializer = new XStreamSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(serializer.serialize(testSubject, byte[].class).getData());
assertEquals(0, deserialized(baos).getUncommittedEventCount());
assertFalse(deserialized(baos).getUncommittedEvents().hasNext());
assertNotNull(deserialized(baos).getIdentifier());
AggregateRoot deserialized = deserialized(baos);
deserialized.doSomething();
assertEquals(1, deserialized.getUncommittedEventCount());
assertNotNull(deserialized.getUncommittedEvents().next());
AggregateRoot deserialized2 = deserialized(baos);
deserialized2.doSomething();
assertNotNull(deserialized2.getUncommittedEvents().next());
assertEquals(1, deserialized2.getUncommittedEventCount());
}
private AggregateRoot deserialized(ByteArrayOutputStream baos) {
XStreamSerializer serializer = new XStreamSerializer();
return (AggregateRoot) serializer.deserialize(new SimpleSerializedObject(baos.toByteArray(),
byte[].class,
"ignored",
0));
public void testRegisterEvent() {
assertEquals(0, testSubject.getUncommittedEventCount());
testSubject.doSomething();
assertEquals(1, testSubject.getUncommittedEventCount());
}
@Test
public void testReadEventStreamDuringEventCommit() {
testSubject.doSomething();
testSubject.doSomething();
DomainEventStream uncommittedEvents = testSubject.getUncommittedEvents();
uncommittedEvents.next();
testSubject.commitEvents();
assertTrue(uncommittedEvents.hasNext());
assertNotNull(uncommittedEvents.next());
assertFalse(uncommittedEvents.hasNext());
}
private static class AggregateRoot extends AbstractAggregateRoot {
private final Object identifier;
private AggregateRoot() {
identifier = IdentifierFactory.getInstance().generateIdentifier();
}
@Override
public Object getIdentifier() {
return identifier;
}
public void doSomething() {
registerEvent(new StubDomainEvent());
}
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.domain;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class AbstractAggregateRootTest {
private AggregateRoot testSubject;
@Before
public void setUp() {
testSubject = new AggregateRoot();
}
@Test
public void testSerializability_GenericXStreamSerializer() throws IOException {
XStreamSerializer serializer = new XStreamSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(serializer.serialize(testSubject, byte[].class).getData());
assertEquals(0, deserialized(baos).getUncommittedEventCount());
assertFalse(deserialized(baos).getUncommittedEvents().hasNext());
assertNotNull(deserialized(baos).getIdentifier());
AggregateRoot deserialized = deserialized(baos);
deserialized.doSomething();
assertEquals(1, deserialized.getUncommittedEventCount());
assertNotNull(deserialized.getUncommittedEvents().next());
AggregateRoot deserialized2 = deserialized(baos);
deserialized2.doSomething();
assertNotNull(deserialized2.getUncommittedEvents().next());
assertEquals(1, deserialized2.getUncommittedEventCount());
}
private AggregateRoot deserialized(ByteArrayOutputStream baos) {
XStreamSerializer serializer = new XStreamSerializer();
return (AggregateRoot) serializer.deserialize(new SimpleSerializedObject(baos.toByteArray(),
byte[].class,
"ignored",
"0"));
}
@Test
public void testRegisterEvent() {
assertEquals(0, testSubject.getUncommittedEventCount());
testSubject.doSomething();
assertEquals(1, testSubject.getUncommittedEventCount());
}
@Test
public void testReadEventStreamDuringEventCommit() {
testSubject.doSomething();
testSubject.doSomething();
DomainEventStream uncommittedEvents = testSubject.getUncommittedEvents();
uncommittedEvents.next();
testSubject.commitEvents();
assertTrue(uncommittedEvents.hasNext());
assertNotNull(uncommittedEvents.next());
assertFalse(uncommittedEvents.hasNext());
}
private static class AggregateRoot extends AbstractAggregateRoot {
private final Object identifier;
private AggregateRoot() {
identifier = IdentifierFactory.getInstance().generateIdentifier();
}
@Override
public Object getIdentifier() {
return identifier;
}
public void doSomething() {
registerEvent(new StubDomainEvent());
}
}
}
File
AbstractAggregateRootTest.java
Developer's decision
Version 1
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.domain;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.reflection.PureJavaReflectionProvider;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.UUID;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class JavaSerializationTest {
private static final Charset UTF8 = Charset.forName("UTF-8");
@Test
public void testSerialize_XStreamWithPureJavaReflectionProvider() {
XStream xstream = new XStream(new PureJavaReflectionProvider());
XStreamSerializer serializer = new XStreamSerializer(UTF8, xstream);
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
String xml = new String(serializer.serialize(aggregateRoot, byte[].class).getData(), UTF8);
assertNotNull(xml);
@Test
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) serializer.deserialize(
new SimpleSerializedObject(xml.getBytes(UTF8), byte[].class, "ignored", "0"));
validateAggregateCondition(aggregateRoot, unmarshalled);
}
@Test
aggregateRoot.doSomething();
public void testSerialize_XStreamWithDefaultReflectionProvider() {
XStream xstream = new XStream();
XStreamSerializer serializer = new XStreamSerializer(UTF8, xstream);
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
byte[] data = serializer.serialize(aggregateRoot, byte[].class).getData();
String xml = new String(data, UTF8);
assertNotNull(xml);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) serializer.deserialize(
new SimpleSerializedObject(data, byte[].class, "ignored", "0"));
validateAggregateCondition(aggregateRoot, unmarshalled);
}
@Test
public void testSerialize_JavaSerialization() throws IOException, ClassNotFoundException {
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new ObjectOutputStream(baos).writeObject(aggregateRoot);
byte[] serialized = baos.toByteArray();
assertNotNull(serialized);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) new ObjectInputStream(
new ByteArrayInputStream(serialized)).readObject();
validateAggregateCondition(aggregateRoot, unmarshalled);
}
private void validateAggregateCondition(StubAnnotatedAggregate original, StubAnnotatedAggregate unmarshalled) {
assertNotNull(unmarshalled);
assertEquals(original.getIdentifier(), unmarshalled.getIdentifier());
assertEquals(null, unmarshalled.getVersion());
assertEquals(1, unmarshalled.getUncommittedEventCount());
unmarshalled.commitEvents();
assertEquals((Long) 0L, unmarshalled.getVersion());
unmarshalled.doSomething();
assertEquals(1L, unmarshalled.getUncommittedEvents().next().getSequenceNumber());
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.domain;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.reflection.PureJavaReflectionProvider;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.serializer.XStreamSerializer;
import org.junit.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.UUID;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class JavaSerializationTest {
private static final Charset UTF8 = Charset.forName("UTF-8");
public void testSerialize_XStreamWithPureJavaReflectionProvider() {
XStream xstream = new XStream(new PureJavaReflectionProvider());
XStreamSerializer serializer = new XStreamSerializer(UTF8, xstream);
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
String xml = new String(serializer.serialize(aggregateRoot, byte[].class).getData(), UTF8);
assertNotNull(xml);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) serializer.deserialize(
new SimpleSerializedObject(xml.getBytes(UTF8), byte[].class, "ignored", 0));
validateAggregateCondition(aggregateRoot, unmarshalled);
}
@Test
public void testSerialize_XStreamWithDefaultReflectionProvider() {
XStream xstream = new XStream();
XStreamSerializer serializer = new XStreamSerializer(UTF8, xstream);
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
byte[] data = serializer.serialize(aggregateRoot, byte[].class).getData();
String xml = new String(data, UTF8);
assertNotNull(xml);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) serializer.deserialize(
new SimpleSerializedObject(data, byte[].class, "ignored", 0));
validateAggregateCondition(aggregateRoot, unmarshalled);
}
@Test
public void testSerialize_JavaSerialization() throws IOException, ClassNotFoundException {
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new ObjectOutputStream(baos).writeObject(aggregateRoot);
byte[] serialized = baos.toByteArray();
assertNotNull(serialized);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) new ObjectInputStream(
new ByteArrayInputStream(serialized)).readObject();
validateAggregateCondition(aggregateRoot, unmarshalled);
}
private void validateAggregateCondition(StubAnnotatedAggregate original, StubAnnotatedAggregate unmarshalled) {
assertNotNull(unmarshalled);
assertEquals(original.getIdentifier(), unmarshalled.getIdentifier());
assertEquals(null, unmarshalled.getVersion());
assertEquals(1, unmarshalled.getUncommittedEventCount());
unmarshalled.commitEvents();
assertEquals((Long) 0L, unmarshalled.getVersion());
unmarshalled.doSomething();
assertEquals(1L, unmarshalled.getUncommittedEvents().next().getSequenceNumber());
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.domain;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.reflection.PureJavaReflectionProvider;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.UUID;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class JavaSerializationTest {
private static final Charset UTF8 = Charset.forName("UTF-8");
@Test
public void testSerialize_XStreamWithPureJavaReflectionProvider() {
XStream xstream = new XStream(new PureJavaReflectionProvider());
XStreamSerializer serializer = new XStreamSerializer(UTF8, xstream);
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
String xml = new String(serializer.serialize(aggregateRoot, byte[].class).getData(), UTF8);
assertNotNull(xml);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) serializer.deserialize(
new SimpleSerializedObject(xml.getBytes(UTF8), byte[].class, "ignored", "0"));
validateAggregateCondition(aggregateRoot, unmarshalled);
}
@Test
public void testSerialize_XStreamWithDefaultReflectionProvider() {
XStream xstream = new XStream();
XStreamSerializer serializer = new XStreamSerializer(UTF8, xstream);
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
byte[] data = serializer.serialize(aggregateRoot, byte[].class).getData();
String xml = new String(data, UTF8);
assertNotNull(xml);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) serializer.deserialize(
new SimpleSerializedObject(data, byte[].class, "ignored", "0"));
validateAggregateCondition(aggregateRoot, unmarshalled);
}
@Test
public void testSerialize_JavaSerialization() throws IOException, ClassNotFoundException {
StubAnnotatedAggregate aggregateRoot = new StubAnnotatedAggregate(UUID.randomUUID());
aggregateRoot.doSomething();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new ObjectOutputStream(baos).writeObject(aggregateRoot);
byte[] serialized = baos.toByteArray();
assertNotNull(serialized);
StubAnnotatedAggregate unmarshalled = (StubAnnotatedAggregate) new ObjectInputStream(
new ByteArrayInputStream(serialized)).readObject();
validateAggregateCondition(aggregateRoot, unmarshalled);
}
private void validateAggregateCondition(StubAnnotatedAggregate original, StubAnnotatedAggregate unmarshalled) {
assertNotNull(unmarshalled);
assertEquals(original.getIdentifier(), unmarshalled.getIdentifier());
assertEquals(null, unmarshalled.getVersion());
assertEquals(1, unmarshalled.getUncommittedEventCount());
unmarshalled.commitEvents();
assertEquals((Long) 0L, unmarshalled.getVersion());
unmarshalled.doSomething();
assertEquals(1L, unmarshalled.getUncommittedEvents().next().getSequenceNumber());
}
}
File
JavaSerializationTest.java
Developer's decision
Version 1
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.serializer.SerializedMetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.joda.time.DateTime;
import org.junit.*;
import java.util.Collections;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Allard Buijze
*/
public class DomainEventEntryTest {
private DomainEventMessage mockDomainEvent;
private SerializedObject mockPayload = new SimpleSerializedObject("PayloadBytes".getBytes(),
byte[].class, "Mock", "0");
private SerializedObject mockMetaData = new SerializedMetaData("MetaDataBytes".getBytes(),
byte[].class);
private Serializer mockSerializer;
private MetaData metaData;
@Before
public void setUp() {
mockDomainEvent = mock(DomainEventMessage.class);
mockSerializer = mock(Serializer.class);
metaData = new MetaData(Collections.singletonMap("Key", "Value"));
when(mockSerializer.deserialize(mockPayload)).thenReturn("Payload");
when(mockSerializer.deserialize(isA(SerializedMetaData.class))).thenReturn(metaData);
}
@Test
public void testDomainEventEntry_WrapEventsCorrectly() {
UUID aggregateIdentifier = UUID.randomUUID();
DateTime timestamp = new DateTime();
UUID eventIdentifier = UUID.randomUUID();
when(mockDomainEvent.getAggregateIdentifier()).thenReturn(aggregateIdentifier);
when(mockDomainEvent.getSequenceNumber()).thenReturn(2L);
when(mockDomainEvent.getTimestamp()).thenReturn(timestamp);
when(mockDomainEvent.getIdentifier()).thenReturn(eventIdentifier.toString());
when(mockDomainEvent.getPayloadType()).thenReturn(String.class);
DomainEventEntry actualResult = new DomainEventEntry("test", mockDomainEvent, mockPayload, mockMetaData);
assertEquals(aggregateIdentifier.toString(), actualResult.getAggregateIdentifier());
assertEquals(2L, actualResult.getSequenceNumber());
assertEquals(timestamp, actualResult.getTimestamp());
assertEquals("test", actualResult.getType());
DomainEventMessage> domainEvent = actualResult.getDomainEvent(mockSerializer);
assertTrue(domainEvent instanceof SerializedDomainEventMessage);
verify(mockSerializer, never()).deserialize(mockPayload);
verify(mockSerializer, never()).deserialize(mockMetaData);
assertEquals("Payload", domainEvent.getPayload());
verify(mockSerializer).deserialize(mockPayload);
verify(mockSerializer, never()).deserialize(mockMetaData);
MetaData actual = domainEvent.getMetaData();
verify(mockSerializer).deserialize(isA(SerializedMetaData.class));
assertEquals(metaData, actual);
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.serializer.SerializedMetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.upcasting.UpcasterChain;
import org.joda.time.DateTime;
import org.junit.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Allard Buijze
*/
public class DomainEventEntryTest {
private DomainEventMessage mockDomainEvent;
private SerializedObject mockPayload = new SimpleSerializedObject("PayloadBytes".getBytes(),
byte[].class, "Mock", 0);
private SerializedObject mockMetaData = new SerializedMetaData("MetaDataBytes".getBytes(),
byte[].class);
private Serializer mockSerializer;
private MetaData metaData;
private UpcasterChain upcasterChain = new UpcasterChain(new ArrayList());
@Before
public void setUp() {
mockDomainEvent = mock(DomainEventMessage.class);
mockSerializer = mock(Serializer.class);
metaData = new MetaData(Collections.singletonMap("Key", "Value"));
when(mockSerializer.deserialize(mockPayload)).thenReturn("Payload");
when(mockSerializer.deserialize(isA(SerializedMetaData.class))).thenReturn(metaData);
}
@Test
public void testDomainEventEntry_WrapEventsCorrectly() {
UUID aggregateIdentifier = UUID.randomUUID();
DateTime timestamp = new DateTime();
UUID eventIdentifier = UUID.randomUUID();
when(mockDomainEvent.getAggregateIdentifier()).thenReturn(aggregateIdentifier);
when(mockDomainEvent.getSequenceNumber()).thenReturn(2L);
when(mockDomainEvent.getTimestamp()).thenReturn(timestamp);
when(mockDomainEvent.getIdentifier()).thenReturn(eventIdentifier.toString());
when(mockDomainEvent.getPayloadType()).thenReturn(String.class);
DomainEventEntry actualResult = new DomainEventEntry("test", mockDomainEvent, mockPayload, mockMetaData);
assertEquals(aggregateIdentifier.toString(), actualResult.getAggregateIdentifier());
assertEquals(2L, actualResult.getSequenceNumber());
assertEquals(timestamp, actualResult.getTimestamp());
assertEquals("test", actualResult.getType());
DomainEventMessage> domainEvent = actualResult.getDomainEvents(mockSerializer, upcasterChain).get(0);
assertTrue(domainEvent instanceof SerializedDomainEventMessage);
verify(mockSerializer, never()).deserialize(mockPayload);
verify(mockSerializer, never()).deserialize(mockMetaData);
assertEquals("Payload", domainEvent.getPayload());
verify(mockSerializer).deserialize(mockPayload);
verify(mockSerializer, never()).deserialize(mockMetaData);
MetaData actual = domainEvent.getMetaData();
verify(mockSerializer).deserialize(isA(SerializedMetaData.class));
assertEquals(metaData, actual);
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.eventstore.SerializedDomainEventMessage;
import org.axonframework.serializer.SerializedMetaData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.axonframework.upcasting.UpcasterChain;
import org.joda.time.DateTime;
import org.junit.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Allard Buijze
*/
public class DomainEventEntryTest {
private DomainEventMessage mockDomainEvent;
private SerializedObject mockPayload = new SimpleSerializedObject("PayloadBytes".getBytes(),
byte[].class, "Mock", "0");
private SerializedObject mockMetaData = new SerializedMetaData("MetaDataBytes".getBytes(),
byte[].class);
private Serializer mockSerializer;
private MetaData metaData;
private UpcasterChain upcasterChain = new UpcasterChain(new ArrayList());
@Before
public void setUp() {
mockDomainEvent = mock(DomainEventMessage.class);
mockSerializer = mock(Serializer.class);
metaData = new MetaData(Collections.singletonMap("Key", "Value"));
when(mockSerializer.deserialize(mockPayload)).thenReturn("Payload");
when(mockSerializer.deserialize(isA(SerializedMetaData.class))).thenReturn(metaData);
}
@Test
public void testDomainEventEntry_WrapEventsCorrectly() {
UUID aggregateIdentifier = UUID.randomUUID();
DateTime timestamp = new DateTime();
UUID eventIdentifier = UUID.randomUUID();
when(mockDomainEvent.getAggregateIdentifier()).thenReturn(aggregateIdentifier);
when(mockDomainEvent.getSequenceNumber()).thenReturn(2L);
when(mockDomainEvent.getTimestamp()).thenReturn(timestamp);
when(mockDomainEvent.getIdentifier()).thenReturn(eventIdentifier.toString());
when(mockDomainEvent.getPayloadType()).thenReturn(String.class);
DomainEventEntry actualResult = new DomainEventEntry("test", mockDomainEvent, mockPayload, mockMetaData);
assertEquals(aggregateIdentifier.toString(), actualResult.getAggregateIdentifier());
assertEquals(2L, actualResult.getSequenceNumber());
assertEquals(timestamp, actualResult.getTimestamp());
assertEquals("test", actualResult.getType());
DomainEventMessage> domainEvent = actualResult.getDomainEvents(mockSerializer, upcasterChain).get(0);
assertTrue(domainEvent instanceof SerializedDomainEventMessage);
verify(mockSerializer, never()).deserialize(mockPayload);
verify(mockSerializer, never()).deserialize(mockMetaData);
assertEquals("Payload", domainEvent.getPayload());
verify(mockSerializer).deserialize(mockPayload);
verify(mockSerializer, never()).deserialize(mockMetaData);
MetaData actual = domainEvent.getMetaData();
verify(mockSerializer).deserialize(isA(SerializedMetaData.class));
assertEquals(metaData, actual);
}
}
File
DomainEventEntryTest.java
Developer's decision
Combination
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.common.jpa.SimpleEntityManagerProvider;
@Override
@Test
/**
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.domain.SimpleDomainEventStream;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.SerializedType;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.junit.*;
import org.junit.runner.*;
import org.mockito.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Allard Buijze
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:/META-INF/spring/db-context.xml",
"classpath:/META-INF/spring/test-context.xml"})
@Transactional()
public class JpaEventStoreTest {
@Autowired
private JpaEventStore testSubject;
@PersistenceContext
private EntityManager entityManager;
private StubAggregateRoot aggregate1;
private StubAggregateRoot aggregate2;
private Object mockAggregateIdentifier;
@Before
public void setUp() {
mockAggregateIdentifier = UUID.randomUUID();
aggregate1 = new StubAggregateRoot(mockAggregateIdentifier);
for (int t = 0; t < 10; t++) {
aggregate1.changeState();
}
aggregate2 = new StubAggregateRoot();
aggregate2.changeState();
aggregate2.changeState();
aggregate2.changeState();
entityManager.createQuery("DELETE FROM DomainEventEntry").executeUpdate();
}
@After
public void tearDown() {
// just to make sure
DateTimeUtils.setCurrentMillisSystem();
}
@Test(expected = IllegalArgumentException.class)
public void testStoreAndLoadEvents_BadIdentifierType() {
testSubject.appendEvents("type", new SimpleDomainEventStream(
new GenericDomainEventMessage(new BadIdentifierType(), 1, new Object())));
}
@Test
public void testStoreAndLoadEvents() {
assertNotNull(testSubject);
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
entityManager.flush();
assertEquals((long) aggregate1.getUncommittedEventCount(),
entityManager.createQuery("SELECT count(e) FROM DomainEventEntry e").getSingleResult());
// we store some more events to make sure only correct events are retrieved
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage(aggregate2.getIdentifier(),
0,
new Object(),
Collections.singletonMap("key", (Object) "Value"))));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregate1.getIdentifier());
List actualEvents = new ArrayList();
while (events.hasNext()) {
DomainEventMessage event = events.next();
event.getPayload();
event.getMetaData();
actualEvents.add(event);
}
assertEquals(aggregate1.getUncommittedEventCount(), actualEvents.size());
/// we make sure persisted events have the same MetaData alteration logic
DomainEventStream other = testSubject.readEvents("test", aggregate2.getIdentifier());
assertTrue(other.hasNext());
DomainEventMessage messageWithMetaData = other.next();
DomainEventMessage altered = messageWithMetaData.withMetaData(Collections.singletonMap("key2",
(Object) "value"));
DomainEventMessage combined = messageWithMetaData.andMetaData(Collections.singletonMap("key2",
(Object) "value"));
assertTrue(altered.getMetaData().containsKey("key2"));
altered.getPayload();
assertFalse(altered.getMetaData().containsKey("key"));
assertTrue(altered.getMetaData().containsKey("key2"));
assertTrue(combined.getMetaData().containsKey("key"));
assertTrue(combined.getMetaData().containsKey("key2"));
assertNotNull(messageWithMetaData.getPayload());
assertNotNull(messageWithMetaData.getMetaData());
assertFalse(messageWithMetaData.getMetaData().isEmpty());
}
@Test
public void testLoad_LargeAmountOfEvents() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregateIdentifier);
long t = 0L;
while (events.hasNext()) {
DomainEventMessage event = events.next();
assertEquals(t, event.getSequenceNumber());
t++;
}
assertEquals(110L, t);
}
@Test
public void testLoad_LargeAmountOfEventsInSmallBatches() {
testSubject.setBatchSize(10);
testLoad_LargeAmountOfEvents();
}
@Test
public void testEntireStreamIsReadOnUnserializableSnapshot_WithException() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
final Serializer serializer = new Serializer() {
@Override
public SerializedObject serialize(Object object, Class expectedType) {
Assert.assertEquals(byte[].class, expectedType);
return new SimpleSerializedObject("this ain't gonna work".getBytes(), byte[].class, "failingType", "0");
}
@Override
public boolean canSerializeTo(Class expectedRepresentation) {
return byte[].class.equals(expectedRepresentation);
}
public Object deserialize(SerializedObject serializedObject) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
return null;
}
}
};
final DomainEventMessage stubDomainEvent = new GenericDomainEventMessage(
aggregateIdentifier,
(long) 30,
"Mock contents", MetaData.emptyInstance()
);
SnapshotEventEntry entry = new SnapshotEventEntry(
"test", stubDomainEvent,
serializer.serialize(stubDomainEvent.getPayload(), byte[].class),
serializer.serialize(stubDomainEvent.getMetaData(), byte[].class));
entityManager.persist(entry);
entityManager.flush();
entityManager.clear();
DomainEventStream stream = testSubject.readEvents("test", aggregateIdentifier);
assertEquals(0L, stream.peek().getSequenceNumber());
}
@Test
public void testEntireStreamIsReadOnUnserializableSnapshot_WithError() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
final Serializer serializer = new Serializer() {
@Override
public SerializedObject serialize(Object object, Class expectedType) {
// this will cause InstantiationError, since it is an interface
Assert.assertEquals(byte[].class, expectedType);
return new SimpleSerializedObject("".getBytes(),
byte[].class,
"failingType",
"0");
}
@Override
public boolean canSerializeTo(Class expectedRepresentation) {
return byte[].class.equals(expectedRepresentation);
}
@Override
public Object deserialize(SerializedObject serializedObject) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
return null;
}
}
};
final DomainEventMessage stubDomainEvent = new GenericDomainEventMessage(
aggregateIdentifier,
(long) 30,
"Mock contents", MetaData.emptyInstance()
);
SnapshotEventEntry entry = new SnapshotEventEntry(
"test", stubDomainEvent,
serializer.serialize(stubDomainEvent.getPayload(), byte[].class),
serializer.serialize(stubDomainEvent.getMetaData(), byte[].class));
entityManager.persist(entry);
entityManager.flush();
entityManager.clear();
DomainEventStream stream = testSubject.readEvents("test", aggregateIdentifier);
assertEquals(0L, stream.peek().getSequenceNumber());
}
@Test
public void testLoad_LargeAmountOfEventsWithSnapshot() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage(aggregateIdentifier, (long) 30,
"Mock contents",
MetaData.emptyInstance()
));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregateIdentifier);
long t = 30L;
while (events.hasNext()) {
DomainEventMessage event = events.next();
assertEquals(t, event.getSequenceNumber());
t++;
}
assertEquals(110L, t);
}
@Test
public void testLoadWithSnapshotEvent() {
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
aggregate1.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("test", aggregate1.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
aggregate1.changeState();
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
aggregate1.commitEvents();
DomainEventStream actualEventStream = testSubject.readEvents("test", aggregate1.getIdentifier());
List domainEvents = new ArrayList();
while (actualEventStream.hasNext()) {
DomainEventMessage next = actualEventStream.next();
domainEvents.add(next);
assertEquals(aggregate1.getIdentifier(), next.getAggregateIdentifier());
}
assertEquals(2, domainEvents.size());
}
@Test(expected = EventStreamNotFoundException.class)
public void testLoadNonExistent() {
testSubject.readEvents("Stub", UUID.randomUUID());
}
@Test
public void testVisitAllEvents() {
EventVisitor eventVisitor = mock(EventVisitor.class);
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(77)));
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(23)));
testSubject.visitEvents(eventVisitor);
verify(eventVisitor, times(100)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_AfterTimestamp() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 0).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThan(onePM), eventVisitor);
verify(eventVisitor, times(13 + 14)).doWithEvent(isA(DomainEventMessage.class));
}
public void testVisitEvents_BetweenTimestamps() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTime twoPM = new DateTime(2011, 12, 18, 14, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(twoPM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThanEquals(onePM)
.and(criteriaBuilder.property("timeStamp").lessThanEquals(twoPM)),
eventVisitor);
verify(eventVisitor, times(12 + 13)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_OnOrAfterTimestamp() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 0).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThanEquals(onePM), eventVisitor);
verify(eventVisitor, times(12 + 13 + 14)).doWithEvent(isA(DomainEventMessage.class));
}
@Test(expected = ConcurrencyException.class)
public void testStoreDuplicateEvent_WithSqlExceptionTranslator() {
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage("123", 0L,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage("123", 0L,
"Mock contents", MetaData.emptyInstance())));
}
@Test
public void testStoreDuplicateEvent_NoSqlExceptionTranslator() {
testSubject.setPersistenceExceptionResolver(null);
try {
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage("123", (long) 0,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage("123", (long) 0,
"Mock contents", MetaData.emptyInstance())));
} catch (ConcurrencyException ex) {
fail("Didn't expect exception to be translated");
} catch (PersistenceException ex) {
assertTrue("Got the right exception, "
+ "but the message doesn't seem to mention 'Constraint': " + ex.getMessage(),
ex.getMessage().contains("Constraint"));
}
}
@Test
public void testPrunesSnaphotsWhenNumberOfSnapshotsExceedsConfiguredMaxSnapshotsArchived() {
testSubject.setMaxSnapshotsArchived(1);
StubAggregateRoot aggregate = new StubAggregateRoot();
aggregate.changeState();
testSubject.appendEvents("type", aggregate.getUncommittedEvents());
aggregate.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("type", aggregate.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
aggregate.changeState();
testSubject.appendEvents("type", aggregate.getUncommittedEvents());
aggregate.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("type", aggregate.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
@SuppressWarnings({"unchecked"})
List snapshots =
entityManager.createQuery("SELECT e FROM SnapshotEventEntry e "
+ "WHERE e.type = 'type' "
+ "AND e.aggregateIdentifier = :aggregateIdentifier")
.setParameter("aggregateIdentifier", aggregate.getIdentifier().toString())
.getResultList();
assertEquals("archived snapshot count", 1L, snapshots.size());
assertEquals("archived snapshot sequence", 1L, snapshots.iterator().next().getSequenceNumber());
}
@SuppressWarnings({"PrimitiveArrayArgumentToVariableArgMethod", "unchecked"})
@Test
public void testCustomEventEntryStore() {
EventEntryStore eventEntryStore = mock(EventEntryStore.class);
testSubject = new JpaEventStore(new SimpleEntityManagerProvider(entityManager), eventEntryStore);
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage(UUID.randomUUID(), (long) 0,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage(UUID.randomUUID(), (long) 0,
"Mock contents", MetaData.emptyInstance())));
verify(eventEntryStore, times(2)).persistEvent(eq("test"), isA(DomainEventMessage.class),
Matchers.any(),
Matchers.any(), same(entityManager));
reset(eventEntryStore);
GenericDomainEventMessage eventMessage = new GenericDomainEventMessage(
UUID.randomUUID(), 0L, "Mock contents", MetaData.emptyInstance());
when(eventEntryStore.fetchBatch(anyString(), any(), anyInt(), anyInt(),
any(EntityManager.class)))
.thenReturn(new ArrayList(Arrays.asList(new DomainEventEntry(
"Mock", eventMessage,
mockSerializedObject("Mock contents".getBytes()),
mockSerializedObject("Mock contents".getBytes())))));
when(eventEntryStore.loadLastSnapshotEvent(anyString(), any(),
any(EntityManager.class)))
.thenReturn(null);
testSubject.readEvents("test", "1");
verify(eventEntryStore).fetchBatch("test", "1", 0, 100, entityManager);
verify(eventEntryStore).loadLastSnapshotEvent("test", "1", entityManager);
}
private SerializedObject mockSerializedObject(byte[] bytes) {
return new SimpleSerializedObject(bytes, byte[].class, "mock", "0");
}
private List> createDomainEvents(int numberOfEvents) {
List> events = new ArrayList>();
final Object aggregateIdentifier = UUID.randomUUID();
for (int t = 0; t < numberOfEvents; t++) {
events.add(new GenericDomainEventMessage(
aggregateIdentifier,
t,
new StubStateChangedEvent(), MetaData.emptyInstance()
));
}
return events;
}
private static class StubAggregateRoot extends AbstractAnnotatedAggregateRoot {
private static final long serialVersionUID = -3656612830058057848L;
private final Object identifier;
private StubAggregateRoot() {
this(UUID.randomUUID());
}
private StubAggregateRoot(Object identifier) {
this.identifier = identifier;
}
public void changeState() {
apply(new StubStateChangedEvent());
}
@Override
public Object getIdentifier() {
return identifier;
}
@EventHandler
public void handleStateChange(StubStateChangedEvent event) {
}
public DomainEventMessage createSnapshotEvent() {
return new GenericDomainEventMessage(getIdentifier(), getVersion(),
new StubStateChangedEvent(),
MetaData.emptyInstance()
);
}
}
private static class StubStateChangedEvent {
private StubStateChangedEvent() {
}
}
private static class BadIdentifierType {
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.common.jpa.SimpleEntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.domain.SimpleDomainEventStream;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.SerializedType;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.junit.*;
import org.junit.runner.*;
import org.mockito.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
* @author Allard Buijze
* @author Frank Versnel
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:/META-INF/spring/db-context.xml",
"classpath:/META-INF/spring/test-context.xml"})
@Transactional()
public class JpaEventStoreTest {
@Autowired
private JpaEventStore testSubject;
@PersistenceContext
private EntityManager entityManager;
private StubAggregateRoot aggregate1;
private StubAggregateRoot aggregate2;
private Object mockAggregateIdentifier;
@Before
public void setUp() {
mockAggregateIdentifier = UUID.randomUUID();
aggregate1 = new StubAggregateRoot(mockAggregateIdentifier);
for (int t = 0; t < 10; t++) {
aggregate1.changeState();
}
aggregate2 = new StubAggregateRoot();
aggregate2.changeState();
aggregate2.changeState();
aggregate2.changeState();
entityManager.createQuery("DELETE FROM DomainEventEntry").executeUpdate();
}
@After
public void tearDown() {
// just to make sure
DateTimeUtils.setCurrentMillisSystem();
}
@Test(expected = IllegalArgumentException.class)
public void testStoreAndLoadEvents_BadIdentifierType() {
testSubject.appendEvents("type", new SimpleDomainEventStream(
new GenericDomainEventMessage(new BadIdentifierType(), 1, new Object())));
}
@Test
public void testStoreAndLoadEvents() {
assertNotNull(testSubject);
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
entityManager.flush();
assertEquals((long) aggregate1.getUncommittedEventCount(),
entityManager.createQuery("SELECT count(e) FROM DomainEventEntry e").getSingleResult());
// we store some more events to make sure only correct events are retrieved
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage(aggregate2.getIdentifier(),
0,
new Object(),
Collections.singletonMap("key", (Object) "Value"))));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregate1.getIdentifier());
List actualEvents = new ArrayList();
while (events.hasNext()) {
DomainEventMessage event = events.next();
event.getPayload();
event.getMetaData();
actualEvents.add(event);
}
assertEquals(aggregate1.getUncommittedEventCount(), actualEvents.size());
/// we make sure persisted events have the same MetaData alteration logic
DomainEventStream other = testSubject.readEvents("test", aggregate2.getIdentifier());
assertTrue(other.hasNext());
DomainEventMessage messageWithMetaData = other.next();
DomainEventMessage altered = messageWithMetaData.withMetaData(Collections.singletonMap("key2",
(Object) "value"));
DomainEventMessage combined = messageWithMetaData.andMetaData(Collections.singletonMap("key2",
(Object) "value"));
assertTrue(altered.getMetaData().containsKey("key2"));
altered.getPayload();
assertFalse(altered.getMetaData().containsKey("key"));
assertTrue(altered.getMetaData().containsKey("key2"));
assertTrue(combined.getMetaData().containsKey("key"));
assertTrue(combined.getMetaData().containsKey("key2"));
assertNotNull(messageWithMetaData.getPayload());
assertNotNull(messageWithMetaData.getMetaData());
assertFalse(messageWithMetaData.getMetaData().isEmpty());
}
@Test
public void testLoad_LargeAmountOfEvents() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregateIdentifier);
long t = 0L;
while (events.hasNext()) {
DomainEventMessage event = events.next();
assertEquals(t, event.getSequenceNumber());
t++;
}
assertEquals(110L, t);
}
@Test
public void testLoad_LargeAmountOfEventsInSmallBatches() {
testSubject.setBatchSize(10);
testLoad_LargeAmountOfEvents();
}
@Test
public void testEntireStreamIsReadOnUnserializableSnapshot_WithException() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
final Serializer serializer = new Serializer() {
@Override
public SerializedObject serialize(Object object, Class expectedType) {
Assert.assertEquals(byte[].class, expectedType);
return new SimpleSerializedObject("this ain't gonna work".getBytes(), byte[].class, "failingType", 0);
}
@Override
public Object deserialize(SerializedObject serializedObject) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
return null;
}
}
};
final DomainEventMessage stubDomainEvent = new GenericDomainEventMessage(
aggregateIdentifier,
(long) 30,
"Mock contents", MetaData.emptyInstance()
);
SnapshotEventEntry entry = new SnapshotEventEntry(
"test", stubDomainEvent,
serializer.serialize(stubDomainEvent.getPayload(), byte[].class),
serializer.serialize(stubDomainEvent.getMetaData(), byte[].class));
entityManager.persist(entry);
entityManager.flush();
entityManager.clear();
DomainEventStream stream = testSubject.readEvents("test", aggregateIdentifier);
assertEquals(0L, stream.peek().getSequenceNumber());
}
@Test
public void testEntireStreamIsReadOnUnserializableSnapshot_WithError() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
final Serializer serializer = new Serializer() {
@Override
public SerializedObject serialize(Object object, Class expectedType) {
// this will cause InstantiationError, since it is an interface
Assert.assertEquals(byte[].class, expectedType);
return new SimpleSerializedObject("".getBytes(),
byte[].class,
"failingType",
0);
}
@Override
public Object deserialize(SerializedObject serializedObject) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
return null;
}
}
};
final DomainEventMessage stubDomainEvent = new GenericDomainEventMessage(
aggregateIdentifier,
(long) 30,
"Mock contents", MetaData.emptyInstance()
);
SnapshotEventEntry entry = new SnapshotEventEntry(
"test", stubDomainEvent,
serializer.serialize(stubDomainEvent.getPayload(), byte[].class),
serializer.serialize(stubDomainEvent.getMetaData(), byte[].class));
entityManager.persist(entry);
entityManager.flush();
entityManager.clear();
DomainEventStream stream = testSubject.readEvents("test", aggregateIdentifier);
assertEquals(0L, stream.peek().getSequenceNumber());
}
@Test
public void testLoad_LargeAmountOfEventsWithSnapshot() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage(aggregateIdentifier, (long) 30,
"Mock contents",
MetaData.emptyInstance()
));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregateIdentifier);
long t = 30L;
while (events.hasNext()) {
DomainEventMessage event = events.next();
assertEquals(t, event.getSequenceNumber());
t++;
}
assertEquals(110L, t);
}
@Test
public void testLoadWithSnapshotEvent() {
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
aggregate1.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("test", aggregate1.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
aggregate1.changeState();
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
aggregate1.commitEvents();
DomainEventStream actualEventStream = testSubject.readEvents("test", aggregate1.getIdentifier());
List domainEvents = new ArrayList();
while (actualEventStream.hasNext()) {
DomainEventMessage next = actualEventStream.next();
domainEvents.add(next);
assertEquals(aggregate1.getIdentifier(), next.getAggregateIdentifier());
}
assertEquals(2, domainEvents.size());
}
@Test(expected = EventStreamNotFoundException.class)
public void testLoadNonExistent() {
testSubject.readEvents("Stub", UUID.randomUUID());
}
@Test
public void testVisitAllEvents() {
EventVisitor eventVisitor = mock(EventVisitor.class);
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(77)));
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(23)));
testSubject.visitEvents(eventVisitor);
verify(eventVisitor, times(100)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_AfterTimestamp() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 0).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThan(onePM), eventVisitor);
verify(eventVisitor, times(13 + 14)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_BetweenTimestamps() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTime twoPM = new DateTime(2011, 12, 18, 14, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(twoPM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThanEquals(onePM)
.and(criteriaBuilder.property("timeStamp").lessThanEquals(twoPM)),
eventVisitor);
verify(eventVisitor, times(12 + 13)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_OnOrAfterTimestamp() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
new GenericDomainEventMessage(UUID.randomUUID(), (long) 0,
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 0).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThanEquals(onePM), eventVisitor);
verify(eventVisitor, times(12 + 13 + 14)).doWithEvent(isA(DomainEventMessage.class));
}
@Test(expected = ConcurrencyException.class)
public void testStoreDuplicateEvent_WithSqlExceptionTranslator() {
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage("123", 0L,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage("123", 0L,
"Mock contents", MetaData.emptyInstance())));
}
@Test
public void testStoreDuplicateEvent_NoSqlExceptionTranslator() {
testSubject.setPersistenceExceptionResolver(null);
try {
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage("123", (long) 0,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage("123", (long) 0,
"Mock contents", MetaData.emptyInstance())));
} catch (ConcurrencyException ex) {
fail("Didn't expect exception to be translated");
} catch (PersistenceException ex) {
assertTrue("Got the right exception, "
+ "but the message doesn't seem to mention 'Constraint': " + ex.getMessage(),
ex.getMessage().contains("Constraint"));
}
}
@Test
public void testPrunesSnaphotsWhenNumberOfSnapshotsExceedsConfiguredMaxSnapshotsArchived() {
testSubject.setMaxSnapshotsArchived(1);
StubAggregateRoot aggregate = new StubAggregateRoot();
aggregate.changeState();
testSubject.appendEvents("type", aggregate.getUncommittedEvents());
aggregate.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("type", aggregate.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
aggregate.changeState();
testSubject.appendEvents("type", aggregate.getUncommittedEvents());
aggregate.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("type", aggregate.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
@SuppressWarnings({"unchecked"})
List snapshots =
entityManager.createQuery("SELECT e FROM SnapshotEventEntry e "
+ "WHERE e.type = 'type' "
+ "AND e.aggregateIdentifier = :aggregateIdentifier")
.setParameter("aggregateIdentifier", aggregate.getIdentifier().toString())
.getResultList();
assertEquals("archived snapshot count", 1L, snapshots.size());
assertEquals("archived snapshot sequence", 1L, snapshots.iterator().next().getSequenceNumber());
}
@SuppressWarnings({"PrimitiveArrayArgumentToVariableArgMethod", "unchecked"})
@Test
public void testCustomEventEntryStore() {
EventEntryStore eventEntryStore = mock(EventEntryStore.class);
testSubject = new JpaEventStore(new SimpleEntityManagerProvider(entityManager), eventEntryStore);
testSubject.appendEvents("test", new SimpleDomainEventStream(
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage(UUID.randomUUID(), (long) 0,
"Mock contents", MetaData.emptyInstance())));
verify(eventEntryStore, times(2)).persistEvent(eq("test"), isA(DomainEventMessage.class),
Matchers.any(),
Matchers.any(), same(entityManager));
reset(eventEntryStore);
GenericDomainEventMessage eventMessage = new GenericDomainEventMessage(
UUID.randomUUID(), 0L, "Mock contents", MetaData.emptyInstance());
when(eventEntryStore.fetchBatch(anyString(), any(), anyInt(), anyInt(),
any(EntityManager.class)))
.thenReturn(new ArrayList(Arrays.asList(new DomainEventEntry(
"Mock", eventMessage,
mockSerializedObject("Mock contents".getBytes()),
mockSerializedObject("Mock contents".getBytes())))));
when(eventEntryStore.loadLastSnapshotEvent(anyString(), any(),
any(EntityManager.class)))
.thenReturn(null);
testSubject.readEvents("test", "1");
verify(eventEntryStore).fetchBatch("test", "1", 0, 100, entityManager);
verify(eventEntryStore).loadLastSnapshotEvent("test", "1", entityManager);
}
private SerializedObject mockSerializedObject(byte[] bytes) {
return new SimpleSerializedObject(bytes, byte[].class, "java.lang.String", 0);
}
private List> createDomainEvents(int numberOfEvents) {
List> events = new ArrayList>();
final Object aggregateIdentifier = UUID.randomUUID();
for (int t = 0; t < numberOfEvents; t++) {
events.add(new GenericDomainEventMessage(
aggregateIdentifier,
t,
new StubStateChangedEvent(), MetaData.emptyInstance()
));
}
return events;
}
private static class StubAggregateRoot extends AbstractAnnotatedAggregateRoot {
private static final long serialVersionUID = -3656612830058057848L;
private final Object identifier;
private StubAggregateRoot() {
this(UUID.randomUUID());
}
private StubAggregateRoot(Object identifier) {
this.identifier = identifier;
}
public void changeState() {
apply(new StubStateChangedEvent());
}
@Override
public Object getIdentifier() {
return identifier;
}
@EventHandler
public void handleStateChange(StubStateChangedEvent event) {
}
public DomainEventMessage createSnapshotEvent() {
return new GenericDomainEventMessage(getIdentifier(), getVersion(),
new StubStateChangedEvent(),
MetaData.emptyInstance()
);
}
}
private static class StubStateChangedEvent {
private StubStateChangedEvent() {
}
}
private static class BadIdentifierType {
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.eventstore.jpa;
import org.axonframework.common.jpa.SimpleEntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.MetaData;
import org.axonframework.domain.SimpleDomainEventStream;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.SerializedType;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.SimpleSerializedObject;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.junit.*;
import org.junit.runner.*;
import org.mockito.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Allard Buijze
* @author Frank Versnel
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:/META-INF/spring/db-context.xml",
"classpath:/META-INF/spring/test-context.xml"})
@Transactional()
public class JpaEventStoreTest {
@Autowired
private JpaEventStore testSubject;
@PersistenceContext
private EntityManager entityManager;
private StubAggregateRoot aggregate1;
private StubAggregateRoot aggregate2;
private Object mockAggregateIdentifier;
@Before
public void setUp() {
mockAggregateIdentifier = UUID.randomUUID();
aggregate1 = new StubAggregateRoot(mockAggregateIdentifier);
for (int t = 0; t < 10; t++) {
aggregate1.changeState();
}
aggregate2 = new StubAggregateRoot();
aggregate2.changeState();
aggregate2.changeState();
aggregate2.changeState();
entityManager.createQuery("DELETE FROM DomainEventEntry").executeUpdate();
}
@After
public void tearDown() {
// just to make sure
DateTimeUtils.setCurrentMillisSystem();
}
@Test(expected = IllegalArgumentException.class)
public void testStoreAndLoadEvents_BadIdentifierType() {
testSubject.appendEvents("type", new SimpleDomainEventStream(
new GenericDomainEventMessage(new BadIdentifierType(), 1, new Object())));
}
@Test
public void testStoreAndLoadEvents() {
assertNotNull(testSubject);
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
entityManager.flush();
assertEquals((long) aggregate1.getUncommittedEventCount(),
entityManager.createQuery("SELECT count(e) FROM DomainEventEntry e").getSingleResult());
// we store some more events to make sure only correct events are retrieved
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage(aggregate2.getIdentifier(),
0,
new Object(),
Collections.singletonMap("key", (Object) "Value"))));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregate1.getIdentifier());
List actualEvents = new ArrayList();
while (events.hasNext()) {
DomainEventMessage event = events.next();
event.getPayload();
event.getMetaData();
actualEvents.add(event);
}
assertEquals(aggregate1.getUncommittedEventCount(), actualEvents.size());
/// we make sure persisted events have the same MetaData alteration logic
DomainEventStream other = testSubject.readEvents("test", aggregate2.getIdentifier());
assertTrue(other.hasNext());
DomainEventMessage messageWithMetaData = other.next();
DomainEventMessage altered = messageWithMetaData.withMetaData(Collections.singletonMap("key2",
(Object) "value"));
DomainEventMessage combined = messageWithMetaData.andMetaData(Collections.singletonMap("key2",
(Object) "value"));
assertTrue(altered.getMetaData().containsKey("key2"));
altered.getPayload();
assertFalse(altered.getMetaData().containsKey("key"));
assertTrue(altered.getMetaData().containsKey("key2"));
assertTrue(combined.getMetaData().containsKey("key"));
assertTrue(combined.getMetaData().containsKey("key2"));
assertNotNull(messageWithMetaData.getPayload());
assertNotNull(messageWithMetaData.getMetaData());
assertFalse(messageWithMetaData.getMetaData().isEmpty());
}
@Test
public void testLoad_LargeAmountOfEvents() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregateIdentifier);
long t = 0L;
while (events.hasNext()) {
DomainEventMessage event = events.next();
assertEquals(t, event.getSequenceNumber());
t++;
}
assertEquals(110L, t);
}
@Test
public void testLoad_LargeAmountOfEventsInSmallBatches() {
testSubject.setBatchSize(10);
testLoad_LargeAmountOfEvents();
}
@Test
public void testEntireStreamIsReadOnUnserializableSnapshot_WithException() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
final Serializer serializer = new Serializer() {
@Override
public SerializedObject serialize(Object object, Class expectedType) {
Assert.assertEquals(byte[].class, expectedType);
return new SimpleSerializedObject("this ain't gonna work".getBytes(), byte[].class, "failingType", "0");
}
@Override
public boolean canSerializeTo(Class expectedRepresentation) {
return byte[].class.equals(expectedRepresentation);
}
@Override
public Object deserialize(SerializedObject serializedObject) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
return null;
}
}
};
final DomainEventMessage stubDomainEvent = new GenericDomainEventMessage(
aggregateIdentifier,
(long) 30,
"Mock contents", MetaData.emptyInstance()
);
SnapshotEventEntry entry = new SnapshotEventEntry(
"test", stubDomainEvent,
serializer.serialize(stubDomainEvent.getPayload(), byte[].class),
serializer.serialize(stubDomainEvent.getMetaData(), byte[].class));
entityManager.persist(entry);
entityManager.flush();
entityManager.clear();
DomainEventStream stream = testSubject.readEvents("test", aggregateIdentifier);
assertEquals(0L, stream.peek().getSequenceNumber());
}
@Test
public void testEntireStreamIsReadOnUnserializableSnapshot_WithError() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
final Serializer serializer = new Serializer() {
@Override
public SerializedObject serialize(Object object, Class expectedType) {
// this will cause InstantiationError, since it is an interface
Assert.assertEquals(byte[].class, expectedType);
return new SimpleSerializedObject("".getBytes(),
byte[].class,
"failingType",
"0");
}
@Override
public boolean canSerializeTo(Class expectedRepresentation) {
return byte[].class.equals(expectedRepresentation);
}
@Override
public Object deserialize(SerializedObject serializedObject) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Class classForType(SerializedType type) {
try {
return Class.forName(type.getName());
} catch (ClassNotFoundException e) {
return null;
}
}
};
final DomainEventMessage stubDomainEvent = new GenericDomainEventMessage(
aggregateIdentifier,
(long) 30,
"Mock contents", MetaData.emptyInstance()
);
SnapshotEventEntry entry = new SnapshotEventEntry(
"test", stubDomainEvent,
serializer.serialize(stubDomainEvent.getPayload(), byte[].class),
serializer.serialize(stubDomainEvent.getMetaData(), byte[].class));
entityManager.persist(entry);
entityManager.flush();
entityManager.clear();
DomainEventStream stream = testSubject.readEvents("test", aggregateIdentifier);
assertEquals(0L, stream.peek().getSequenceNumber());
}
@Test
public void testLoad_LargeAmountOfEventsWithSnapshot() {
List> domainEvents = new ArrayList>(110);
String aggregateIdentifier = "id";
for (int t = 0; t < 110; t++) {
domainEvents.add(new GenericDomainEventMessage(aggregateIdentifier, (long) t,
"Mock contents", MetaData.emptyInstance()));
}
testSubject.appendEvents("test", new SimpleDomainEventStream(domainEvents));
testSubject.appendSnapshotEvent("test", new GenericDomainEventMessage(aggregateIdentifier, (long) 30,
"Mock contents",
MetaData.emptyInstance()
));
entityManager.flush();
entityManager.clear();
DomainEventStream events = testSubject.readEvents("test", aggregateIdentifier);
long t = 30L;
while (events.hasNext()) {
DomainEventMessage event = events.next();
assertEquals(t, event.getSequenceNumber());
t++;
}
assertEquals(110L, t);
}
@Test
public void testLoadWithSnapshotEvent() {
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
aggregate1.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("test", aggregate1.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
aggregate1.changeState();
testSubject.appendEvents("test", aggregate1.getUncommittedEvents());
aggregate1.commitEvents();
DomainEventStream actualEventStream = testSubject.readEvents("test", aggregate1.getIdentifier());
List domainEvents = new ArrayList();
while (actualEventStream.hasNext()) {
DomainEventMessage next = actualEventStream.next();
domainEvents.add(next);
assertEquals(aggregate1.getIdentifier(), next.getAggregateIdentifier());
}
assertEquals(2, domainEvents.size());
}
@Test(expected = EventStreamNotFoundException.class)
public void testLoadNonExistent() {
testSubject.readEvents("Stub", UUID.randomUUID());
}
@Test
public void testVisitAllEvents() {
EventVisitor eventVisitor = mock(EventVisitor.class);
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(77)));
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(23)));
testSubject.visitEvents(eventVisitor);
verify(eventVisitor, times(100)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_AfterTimestamp() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 0).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThan(onePM), eventVisitor);
verify(eventVisitor, times(13 + 14)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_BetweenTimestamps() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTime twoPM = new DateTime(2011, 12, 18, 14, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(twoPM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThanEquals(onePM)
.and(criteriaBuilder.property("timeStamp").lessThanEquals(twoPM)),
eventVisitor);
verify(eventVisitor, times(12 + 13)).doWithEvent(isA(DomainEventMessage.class));
}
@Test
public void testVisitEvents_OnOrAfterTimestamp() {
EventVisitor eventVisitor = mock(EventVisitor.class);
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 12, 59, 59, 999).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(11)));
DateTime onePM = new DateTime(2011, 12, 18, 13, 0, 0, 0);
DateTimeUtils.setCurrentMillisFixed(onePM.getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(12)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 0).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(13)));
DateTimeUtils.setCurrentMillisFixed(new DateTime(2011, 12, 18, 14, 0, 0, 1).getMillis());
testSubject.appendEvents("test", new SimpleDomainEventStream(createDomainEvents(14)));
DateTimeUtils.setCurrentMillisSystem();
CriteriaBuilder criteriaBuilder = testSubject.newCriteriaBuilder();
testSubject.visitEvents(criteriaBuilder.property("timeStamp").greaterThanEquals(onePM), eventVisitor);
verify(eventVisitor, times(12 + 13 + 14)).doWithEvent(isA(DomainEventMessage.class));
}
@Test(expected = ConcurrencyException.class)
public void testStoreDuplicateEvent_WithSqlExceptionTranslator() {
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage("123", 0L,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage("123", 0L,
"Mock contents", MetaData.emptyInstance())));
}
@Test
public void testStoreDuplicateEvent_NoSqlExceptionTranslator() {
testSubject.setPersistenceExceptionResolver(null);
try {
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage("123", (long) 0,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage("123", (long) 0,
"Mock contents", MetaData.emptyInstance())));
} catch (ConcurrencyException ex) {
fail("Didn't expect exception to be translated");
} catch (PersistenceException ex) {
assertTrue("Got the right exception, "
+ "but the message doesn't seem to mention 'Constraint': " + ex.getMessage(),
ex.getMessage().contains("Constraint"));
}
}
@Test
public void testPrunesSnaphotsWhenNumberOfSnapshotsExceedsConfiguredMaxSnapshotsArchived() {
testSubject.setMaxSnapshotsArchived(1);
StubAggregateRoot aggregate = new StubAggregateRoot();
aggregate.changeState();
testSubject.appendEvents("type", aggregate.getUncommittedEvents());
aggregate.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("type", aggregate.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
aggregate.changeState();
testSubject.appendEvents("type", aggregate.getUncommittedEvents());
aggregate.commitEvents();
entityManager.flush();
entityManager.clear();
testSubject.appendSnapshotEvent("type", aggregate.createSnapshotEvent());
entityManager.flush();
entityManager.clear();
@SuppressWarnings({"unchecked"})
List snapshots =
entityManager.createQuery("SELECT e FROM SnapshotEventEntry e "
+ "WHERE e.type = 'type' "
+ "AND e.aggregateIdentifier = :aggregateIdentifier")
.setParameter("aggregateIdentifier", aggregate.getIdentifier().toString())
.getResultList();
assertEquals("archived snapshot count", 1L, snapshots.size());
assertEquals("archived snapshot sequence", 1L, snapshots.iterator().next().getSequenceNumber());
}
@SuppressWarnings({"PrimitiveArrayArgumentToVariableArgMethod", "unchecked"})
@Test
public void testCustomEventEntryStore() {
EventEntryStore eventEntryStore = mock(EventEntryStore.class);
testSubject = new JpaEventStore(new SimpleEntityManagerProvider(entityManager), eventEntryStore);
testSubject.appendEvents("test", new SimpleDomainEventStream(
new GenericDomainEventMessage(UUID.randomUUID(), (long) 0,
"Mock contents", MetaData.emptyInstance()),
new GenericDomainEventMessage(UUID.randomUUID(), (long) 0,
"Mock contents", MetaData.emptyInstance())));
verify(eventEntryStore, times(2)).persistEvent(eq("test"), isA(DomainEventMessage.class),
Matchers.any(),
Matchers.any(), same(entityManager));
reset(eventEntryStore);
GenericDomainEventMessage eventMessage = new GenericDomainEventMessage(
UUID.randomUUID(), 0L, "Mock contents", MetaData.emptyInstance());
when(eventEntryStore.fetchBatch(anyString(), any(), anyInt(), anyInt(),
any(EntityManager.class)))
.thenReturn(new ArrayList(Arrays.asList(new DomainEventEntry(
"Mock", eventMessage,
mockSerializedObject("Mock contents".getBytes()),
mockSerializedObject("Mock contents".getBytes())))));
when(eventEntryStore.loadLastSnapshotEvent(anyString(), any(),
any(EntityManager.class)))
.thenReturn(null);
testSubject.readEvents("test", "1");
verify(eventEntryStore).fetchBatch("test", "1", 0, 100, entityManager);
verify(eventEntryStore).loadLastSnapshotEvent("test", "1", entityManager);
}
private SerializedObject mockSerializedObject(byte[] bytes) {
return new SimpleSerializedObject(bytes, byte[].class, "java.lang.String", "0");
}
private List> createDomainEvents(int numberOfEvents) {
List> events = new ArrayList>();
final Object aggregateIdentifier = UUID.randomUUID();
for (int t = 0; t < numberOfEvents; t++) {
events.add(new GenericDomainEventMessage(
aggregateIdentifier,
t,
new StubStateChangedEvent(), MetaData.emptyInstance()
));
}
return events;
}
private static class StubAggregateRoot extends AbstractAnnotatedAggregateRoot {
private static final long serialVersionUID = -3656612830058057848L;
private final Object identifier;
private StubAggregateRoot() {
this(UUID.randomUUID());
}
private StubAggregateRoot(Object identifier) {
this.identifier = identifier;
}
public void changeState() {
apply(new StubStateChangedEvent());
}
@Override
public Object getIdentifier() {
return identifier;
}
@EventHandler
public void handleStateChange(StubStateChangedEvent event) {
}
public DomainEventMessage createSnapshotEvent() {
return new GenericDomainEventMessage(getIdentifier(), getVersion(),
new StubStateChangedEvent(),
MetaData.emptyInstance()
);
}
}
private static class StubStateChangedEvent {
private StubStateChangedEvent() {
}
}
private static class BadIdentifierType {
}
}
File
JpaEventStoreTest.java
Developer's decision
Manual
Kind of conflict
Annotation
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
import org.junit.*;
<<<<<<< HEAD
package org.axonframework.serializer;
import java.io.Serializable;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class JavaSerializerTest {
private JavaSerializer testSubject;
@Before
public void setUp() throws Exception {
testSubject = new JavaSerializer();
}
@Test
public void testSerializeAndDeserialize() {
SerializedObject serializedObject = testSubject.serialize(new MySerializableObject("hello"),
byte[].class);
assertEquals(MySerializableObject.class.getName(), serializedObject.getType().getName());
assertEquals(null, serializedObject.getType().getRevision());
Object actualResult = testSubject.deserialize(serializedObject);
assertTrue(actualResult instanceof MySerializableObject);
assertEquals("hello", ((MySerializableObject) actualResult).getSomeProperty());
}
@Test
public void testClassForType() {
Class actual = testSubject.classForType(new SimpleSerializedType(MySerializableObject.class.getName(), "0"));
assertEquals(MySerializableObject.class, actual);
}
@Test
public void testClassForType_UnknownClass() {
assertNull(testSubject.classForType(new SimpleSerializedType("unknown", "0")));
}
private static class MySerializableObject implements Serializable {
private static final long serialVersionUID = 2166108932776672373L;
private String someProperty;
public MySerializableObject(String someProperty) {
this.someProperty = someProperty;
}
public String getSomeProperty() {
return someProperty;
}
}
}
=======
package org.axonframework.serializer;
import org.junit.*;
import java.io.Serializable;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class JavaSerializerTest {
private JavaSerializer testSubject;
@Before
public void setUp() throws Exception {
testSubject = new JavaSerializer();
}
@Test
public void testSerializeAndDeserialize() {
SerializedObject serializedObject = testSubject.serialize(new MySerializableObject("hello"),
byte[].class);
assertEquals(MySerializableObject.class.getName(), serializedObject.getType().getName());
assertEquals(0, serializedObject.getType().getRevision());
Object actualResult = testSubject.deserialize(serializedObject);
assertTrue(actualResult instanceof MySerializableObject);
assertEquals("hello", ((MySerializableObject) actualResult).getSomeProperty());
}
@Test
public void testClassForType() {
Class actual = testSubject.classForType(new SimpleSerializedType(MySerializableObject.class.getName(), 0));
assertEquals(MySerializableObject.class, actual);
}
@Test
public void testClassForType_UnknownClass() {
assertNull(testSubject.classForType(new SimpleSerializedType("unknown", 0)));
}
private static class MySerializableObject implements Serializable {
private static final long serialVersionUID = 2166108932776672373L;
private String someProperty;
public MySerializableObject(String someProperty) {
this.someProperty = someProperty;
}
public String getSomeProperty() {
return someProperty;
}
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
import org.junit.*;
package org.axonframework.serializer;
import java.io.Serializable;
import static org.junit.Assert.*;
/**
* @author Allard Buijze
*/
public class JavaSerializerTest {
private JavaSerializer testSubject;
@Before
public void setUp() throws Exception {
testSubject = new JavaSerializer();
}
@Test
public void testSerializeAndDeserialize() {
SerializedObject serializedObject = testSubject.serialize(new MySerializableObject("hello"),
byte[].class);
assertEquals(MySerializableObject.class.getName(), serializedObject.getType().getName());
assertEquals(null, serializedObject.getType().getRevision());
Object actualResult = testSubject.deserialize(serializedObject);
assertTrue(actualResult instanceof MySerializableObject);
assertEquals("hello", ((MySerializableObject) actualResult).getSomeProperty());
}
@Test
public void testClassForType() {
Class actual = testSubject.classForType(new SimpleSerializedType(MySerializableObject.class.getName(), "0"));
assertEquals(MySerializableObject.class, actual);
}
@Test
public void testClassForType_UnknownClass() {
assertNull(testSubject.classForType(new SimpleSerializedType("unknown", "0")));
}
private static class MySerializableObject implements Serializable {
private static final long serialVersionUID = 2166108932776672373L;
private String someProperty;
public MySerializableObject(String someProperty) {
this.someProperty = someProperty;
}
public String getSomeProperty() {
return someProperty;
}
}
}
assertEquals(SPECIAL__CHAR__STRING, deserialized.getName());
}
<<<<<<< HEAD:core/src/test/java/org/axonframework/serializer/xml/XStreamSerializerTest.java
@Revision("2")
public static class RevisionSpecifiedEvent {
=======
@Revision(2)
public static class RevisionSpecifiedEvent {
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:core/src/test/java/org/axonframework/serializer/XStreamSerializerTest.java
}
public static class TestEvent {
Solution content
assertEquals(SPECIAL__CHAR__STRING, deserialized.getName());
}
@Revision("2")
public static class RevisionSpecifiedEvent {
}
public static class TestEvent {
<<<<<<< HEAD
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.integrationtests.domain;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
import java.io.UnsupportedEncodingException;
import static junit.framework.Assert.assertEquals;
/**
* Test that reproduces a problem where a structured aggregate (containing multiple entities) is not serialized
* properly.
*
* @author Allard Buijze
*/
public class StructuredAggregateSerializationTest {
@Test
public void testSerializeAndDeserializeAggregate() throws UnsupportedEncodingException {
StructuredAggregateRoot aggregateRoot = new StructuredAggregateRoot();
aggregateRoot.invoke();
assertEquals(2, aggregateRoot.getInvocations());
assertEquals(2, aggregateRoot.getEntity().getInvocations());
aggregateRoot.commitEvents();
XStreamSerializer serializer = new XStreamSerializer();
SerializedObject serialized = serializer.serialize(aggregateRoot, byte[].class);
StructuredAggregateRoot deserializedAggregate = (StructuredAggregateRoot) serializer.deserialize(serialized);
deserializedAggregate.invoke();
assertEquals(3, deserializedAggregate.getInvocations());
assertEquals(3, deserializedAggregate.getEntity().getInvocations());
}
}
=======
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.integrationtests.domain;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.XStreamSerializer;
import org.junit.*;
import java.io.UnsupportedEncodingException;
import static junit.framework.Assert.assertEquals;
/**
* Test that reproduces a problem where a structured aggregate (containing multiple entities) is not serialized
* properly.
*
* @author Allard Buijze
*/
public class StructuredAggregateSerializationTest {
@Test
public void testSerializeAndDeserializeAggregate() throws UnsupportedEncodingException {
StructuredAggregateRoot aggregateRoot = new StructuredAggregateRoot();
aggregateRoot.invoke();
assertEquals(2, aggregateRoot.getInvocations());
assertEquals(2, aggregateRoot.getEntity().getInvocations());
aggregateRoot.commitEvents();
XStreamSerializer serializer = new XStreamSerializer();
SerializedObject serialized = serializer.serialize(aggregateRoot, byte[].class);
StructuredAggregateRoot deserializedAggregate = (StructuredAggregateRoot) serializer.deserialize(serialized);
deserializedAggregate.invoke();
assertEquals(3, deserializedAggregate.getInvocations());
assertEquals(3, deserializedAggregate.getEntity().getInvocations());
}
}
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f
Solution content
/*
* Copyright (c) 2010-2011. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.axonframework.integrationtests.domain;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.junit.*;
import java.io.UnsupportedEncodingException;
import static junit.framework.Assert.assertEquals;
/**
* Test that reproduces a problem where a structured aggregate (containing multiple entities) is not serialized
* properly.
*
* @author Allard Buijze
*/
public class StructuredAggregateSerializationTest {
@Test
public void testSerializeAndDeserializeAggregate() throws UnsupportedEncodingException {
StructuredAggregateRoot aggregateRoot = new StructuredAggregateRoot();
aggregateRoot.invoke();
assertEquals(2, aggregateRoot.getInvocations());
assertEquals(2, aggregateRoot.getEntity().getInvocations());
aggregateRoot.commitEvents();
XStreamSerializer serializer = new XStreamSerializer();
SerializedObject serialized = serializer.serialize(aggregateRoot, byte[].class);
StructuredAggregateRoot deserializedAggregate = (StructuredAggregateRoot) serializer.deserialize(serialized);
deserializedAggregate.invoke();
assertEquals(3, deserializedAggregate.getInvocations());
assertEquals(3, deserializedAggregate.getEntity().getInvocations());
}
}
File
StructuredAggregateSerializationTest.java
Developer's decision
Version 1
Kind of conflict
Class declaration
Comment
Import
Package declaration
Chunk
Conflicting content
* @param upcasterChain Set of upcasters to use when an event needs upcasting before de-serialization
* @return The actual DomainEvent
*/
<<<<<<< HEAD:mongo-eventstore/src/main/java/org/axonframework/eventstore/mongo/EventEntry.java
@SuppressWarnings("unchecked")
public DomainEventMessage getDomainEvent(Serializer eventSerializer) {
Class> representationType = String.class;
if (serializedPayload instanceof DBObject) {
representationType = DBObject.class;
}
return new SerializedDomainEventMessage(
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
new DateTime(timeStamp),
new LazyDeserializingObject(
new SimpleSerializedObject(serializedPayload, representationType, payoadType, payloadRevision),
eventSerializer),
new LazyDeserializingObject(new SerializedMetaData(serializedMetaData, representationType),
eventSerializer));
=======
public List getDomainEvent(Serializer eventSerializer, UpcasterChain upcasterChain) {
return SerializedDomainEventMessage.createDomainEventMessages(eventSerializer,
eventIdentifier,
aggregateIdentifier,
sequenceNumber,
new DateTime(timeStamp),
new SimpleSerializedObject(
serializedPayload,
String.class,
payoadType,
payloadRevision),
new SerializedMetaData(serializedMetaData,
String.class),
upcasterChain);
>>>>>>> a96e6ce5ae6e63f4be60f80e78e12199f654d19f:incubator/mongo/src/main/java/org/axonframework/eventstore/mongo/EventEntry.java
}
/**
Solution content
* @param upcasterChain Set of upcasters to use when an event needs upcasting before de-serialization
* @return The actual DomainEvent
*/
@SuppressWarnings("unchecked")
public List getDomainEvents(Serializer eventSerializer, UpcasterChain upcasterChain) {
Class> representationType = String.class;
if (serializedPayload instanceof DBObject) {
representationType = DBObject.class;
}
return createDomainEventMessages(eventSerializer, eventIdentifier, aggregateIdentifier, sequenceNumber,
new DateTime(timeStamp),
new SimpleSerializedObject(serializedPayload, representationType,
payoadType, payloadRevision),
new SerializedMetaData(serializedMetaData, representationType),
upcasterChain);
}
/**