001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.TreeSet;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.command.TransactionId;
034import org.apache.activemq.command.XATransactionId;
035import org.apache.activemq.store.AbstractMessageStore;
036import org.apache.activemq.store.ListenableFuture;
037import org.apache.activemq.store.MessageStore;
038import org.apache.activemq.store.PersistenceAdapter;
039import org.apache.activemq.store.ProxyMessageStore;
040import org.apache.activemq.store.ProxyTopicMessageStore;
041import org.apache.activemq.store.TopicMessageStore;
042import org.apache.activemq.store.TransactionRecoveryListener;
043import org.apache.activemq.store.TransactionStore;
044import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
045import org.apache.activemq.store.kahadb.data.KahaEntryType;
046import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
047import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
048import org.apache.activemq.store.kahadb.disk.journal.Journal;
049import org.apache.activemq.store.kahadb.disk.journal.Location;
050import org.apache.activemq.util.DataByteArrayInputStream;
051import org.apache.activemq.util.DataByteArrayOutputStream;
052import org.apache.activemq.util.IOHelper;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056public class MultiKahaDBTransactionStore implements TransactionStore {
057    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
058    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
059    final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
060    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
061    private Journal journal;
062    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
063    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
064
065    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
066        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
067    }
068
069    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
070        return new ProxyMessageStore(messageStore) {
071            @Override
072            public void addMessage(ConnectionContext context, final Message send) throws IOException {
073                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
074            }
075
076            @Override
077            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
078                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
079            }
080
081            @Override
082            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
083                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
084            }
085
086            @Override
087            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
088                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
089            }
090
091            @Override
092            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
093                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
094            }
095
096            @Override
097            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
098                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
099            }
100        };
101    }
102
103    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
104        return new ProxyTopicMessageStore(messageStore) {
105            @Override
106            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
107                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
108            }
109
110            @Override
111            public void addMessage(ConnectionContext context, final Message send) throws IOException {
112                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
113            }
114
115            @Override
116            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
117                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
118            }
119
120            @Override
121            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
122                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
123            }
124
125            @Override
126            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
127                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
128            }
129
130            @Override
131            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
132                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
133            }
134
135            @Override
136            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
137                                    MessageId messageId, MessageAck ack) throws IOException {
138                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
139                        subscriptionName, messageId, ack);
140            }
141        };
142    }
143
144    public void deleteAllMessages() {
145        IOHelper.deleteChildren(getDirectory());
146    }
147
148    public int getJournalMaxFileLength() {
149        return journalMaxFileLength;
150    }
151
152    public void setJournalMaxFileLength(int journalMaxFileLength) {
153        this.journalMaxFileLength = journalMaxFileLength;
154    }
155
156    public int getJournalMaxWriteBatchSize() {
157        return journalWriteBatchSize;
158    }
159
160    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
161        this.journalWriteBatchSize = journalWriteBatchSize;
162    }
163
164    public class Tx {
165        private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
166        private int prepareLocationId = 0;
167
168        public void trackStore(TransactionStore store) {
169            stores.add(store);
170        }
171
172        public Set<TransactionStore> getStores() {
173            return stores;
174        }
175
176        public void trackPrepareLocation(Location location) {
177            this.prepareLocationId = location.getDataFileId();
178        }
179
180        public int getPreparedLocationId() {
181            return prepareLocationId;
182        }
183    }
184
185    public Tx getTx(TransactionId txid) {
186        Tx tx = inflightTransactions.get(txid);
187        if (tx == null) {
188            tx = new Tx();
189            inflightTransactions.put(txid, tx);
190        }
191        return tx;
192    }
193
194    public Tx removeTx(TransactionId txid) {
195        return inflightTransactions.remove(txid);
196    }
197
198    @Override
199    public void prepare(TransactionId txid) throws IOException {
200        Tx tx = getTx(txid);
201        for (TransactionStore store : tx.getStores()) {
202            store.prepare(txid);
203        }
204    }
205
206    @Override
207    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
208            throws IOException {
209
210        if (preCommit != null) {
211            preCommit.run();
212        }
213
214        Tx tx = getTx(txid);
215        if (wasPrepared) {
216            for (TransactionStore store : tx.getStores()) {
217                store.commit(txid, true, null, null);
218            }
219        } else {
220            // can only do 1pc on a single store
221            if (tx.getStores().size() == 1) {
222                for (TransactionStore store : tx.getStores()) {
223                    store.commit(txid, false, null, null);
224                }
225            } else {
226                // need to do local 2pc
227                for (TransactionStore store : tx.getStores()) {
228                    store.prepare(txid);
229                }
230                persistOutcome(tx, txid);
231                for (TransactionStore store : tx.getStores()) {
232                    store.commit(txid, true, null, null);
233                }
234                persistCompletion(txid);
235            }
236        }
237        removeTx(txid);
238        if (postCommit != null) {
239            postCommit.run();
240        }
241    }
242
243    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
244        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
245    }
246
247    public void persistCompletion(TransactionId txid) throws IOException {
248        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
249    }
250
251    private Location store(JournalCommand<?> data) throws IOException {
252        int size = data.serializedSizeFramed();
253        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
254        os.writeByte(data.type().getNumber());
255        data.writeFramed(os);
256        Location location = journal.write(os.toByteSequence(), true);
257        journal.setLastAppendLocation(location);
258        return location;
259    }
260
261    @Override
262    public void rollback(TransactionId txid) throws IOException {
263        Tx tx = removeTx(txid);
264        if (tx != null) {
265            for (TransactionStore store : tx.getStores()) {
266                store.rollback(txid);
267            }
268        }
269    }
270
271    @Override
272    public void start() throws Exception {
273        journal = new Journal() {
274            @Override
275            protected void cleanup() {
276                super.cleanup();
277                txStoreCleanup();
278            }
279        };
280        journal.setDirectory(getDirectory());
281        journal.setMaxFileLength(journalMaxFileLength);
282        journal.setWriteBatchSize(journalWriteBatchSize);
283        IOHelper.mkdirs(journal.getDirectory());
284        journal.start();
285        recoverPendingLocalTransactions();
286        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
287    }
288
289    private void txStoreCleanup() {
290        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
291        for (Tx tx : inflightTransactions.values()) {
292            knownDataFileIds.remove(tx.getPreparedLocationId());
293        }
294        try {
295            journal.removeDataFiles(knownDataFileIds);
296        } catch (Exception e) {
297            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
298        }
299    }
300
301    private File getDirectory() {
302        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
303    }
304
305    @Override
306    public void stop() throws Exception {
307        journal.close();
308        journal = null;
309    }
310
311    private void recoverPendingLocalTransactions() throws IOException {
312        Location location = journal.getNextLocation(null);
313        while (location != null) {
314            process(load(location));
315            location = journal.getNextLocation(location);
316        }
317        recoveredPendingCommit.addAll(inflightTransactions.keySet());
318        LOG.info("pending local transactions: " + recoveredPendingCommit);
319    }
320
321    public JournalCommand<?> load(Location location) throws IOException {
322        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
323        byte readByte = is.readByte();
324        KahaEntryType type = KahaEntryType.valueOf(readByte);
325        if (type == null) {
326            throw new IOException("Could not load journal record. Invalid location: " + location);
327        }
328        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
329        message.mergeFramed(is);
330        return message;
331    }
332
333    public void process(JournalCommand<?> command) throws IOException {
334        switch (command.type()) {
335            case KAHA_PREPARE_COMMAND:
336                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
337                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
338                break;
339            case KAHA_COMMIT_COMMAND:
340                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
341                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
342                break;
343            case KAHA_TRACE_COMMAND:
344                break;
345            default:
346                throw new IOException("Unexpected command in transaction journal: " + command);
347        }
348    }
349
350
351    @Override
352    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
353
354        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
355            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
356                @Override
357                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
358                    try {
359                        getTx(xid).trackStore(adapter.createTransactionStore());
360                    } catch (IOException e) {
361                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
362                    }
363                    listener.recover(xid, addedMessages, acks);
364                }
365            });
366        }
367
368        try {
369            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
370            // force completion of local xa
371            for (TransactionId txid : broker.getPreparedTransactions(null)) {
372                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
373                    try {
374                        if (recoveredPendingCommit.contains(txid)) {
375                            LOG.info("delivering pending commit outcome for tid: " + txid);
376                            broker.commitTransaction(null, txid, false);
377
378                        } else {
379                            LOG.info("delivering rollback outcome to store for tid: " + txid);
380                            broker.forgetTransaction(null, txid);
381                        }
382                        persistCompletion(txid);
383                    } catch (Exception ex) {
384                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
385                    }
386                }
387            }
388        } catch (Exception e) {
389            LOG.error("failed to resolve pending local transactions", e);
390        }
391    }
392
393    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
394            throws IOException {
395        if (message.getTransactionId() != null) {
396            getTx(message.getTransactionId()).trackStore(transactionStore);
397        }
398        destination.addMessage(context, message);
399    }
400
401    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
402            throws IOException {
403        if (message.getTransactionId() != null) {
404            getTx(message.getTransactionId()).trackStore(transactionStore);
405            destination.addMessage(context, message);
406            return AbstractMessageStore.FUTURE;
407        } else {
408            return destination.asyncAddQueueMessage(context, message);
409        }
410    }
411
412    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
413            throws IOException {
414
415        if (message.getTransactionId() != null) {
416            getTx(message.getTransactionId()).trackStore(transactionStore);
417            destination.addMessage(context, message);
418            return AbstractMessageStore.FUTURE;
419        } else {
420            return destination.asyncAddTopicMessage(context, message);
421        }
422    }
423
424    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
425            throws IOException {
426        if (ack.getTransactionId() != null) {
427            getTx(ack.getTransactionId()).trackStore(transactionStore);
428        }
429        destination.removeMessage(context, ack);
430    }
431
432    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
433            throws IOException {
434        if (ack.getTransactionId() != null) {
435            getTx(ack.getTransactionId()).trackStore(transactionStore);
436        }
437        destination.removeAsyncMessage(context, ack);
438    }
439
440    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
441                           final String clientId, final String subscriptionName,
442                           final MessageId messageId, final MessageAck ack) throws IOException {
443        if (ack.getTransactionId() != null) {
444            getTx(ack.getTransactionId()).trackStore(transactionStore);
445        }
446        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
447    }
448
449}