@@ -46,6 +46,7 @@ obj-$(CONFIG_DM_MIRROR) += dm-mirror.o dm-log.o dm-region-hash.o
obj-$(CONFIG_DM_LOG_USERSPACE) += dm-log-userspace.o
obj-$(CONFIG_DM_ZERO) += dm-zero.o
obj-$(CONFIG_DM_REPLICATOR) += dm-replicator.o \
+ dm-repl-log-ringbuffer.o \
dm-log.o dm-registry.o
obj-$(CONFIG_DM_RAID45) += dm-raid45.o dm-log.o dm-memcache.o \
dm-region-hash.o
new file mode 100644
@@ -0,0 +1,4999 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * Module Authors: Jeff Moyer (jmoyer@redhat.com)
+ * Heinz Mauelshagen (heinzm@redhat.com)
+ *
+ * This file is released under the GPL.
+ *
+ * "default" device-mapper replication log type implementing a ring buffer
+ * for write IOs, which will be copied accross site links to devices.
+ *
+ * A log like this allows for write coalescing enhancements in order
+ * to reduce network traffic at the cost of larger fallbehind windows.
+ */
+
+/*
+ * Locking:
+ * l->io.lock for io (de)queueing / slink manipulation
+ * l->lists.lock for copy contexts moved around lists
+ *
+ * The ringbuffer lock does not need to be held in order to take the io.lock,
+ * but if they are both acquired, the ordering must be as indicated above.
+ */
+
+#include "dm-repl.h"
+#include "dm-registry.h"
+#include "dm-repl-log.h"
+#include "dm-repl-slink.h"
+
+#include <linux/crc32.h>
+#include <linux/dm-io.h>
+#include <linux/kernel.h>
+#include <linux/version.h>
+
+static const char version[] = "v0.028";
+static struct dm_repl_log_type ringbuffer_type;
+
+static struct mutex list_mutex;
+
+#define DM_MSG_PREFIX "dm-repl-log-ringbuffer"
+#define DAEMON DM_MSG_PREFIX "d"
+
+/* Maximum number of site links supported. */
+#define MAX_DEFAULT_SLINKS 2048
+
+#define DEFAULT_BIOS 16 /* Default number of max bios -> ring buffer */
+
+#define LOG_SIZE_MIN (2 * BIO_MAX_SECTORS)
+#define REGIONS_MAX 32768
+
+/* Later kernels have this macro in bitops.h */
+#ifndef for_each_bit
+#define for_each_bit(bit, addr, size) \
+ for ((bit) = find_first_bit((void *)(addr), (size)); \
+ (bit) < (size); \
+ (bit) = find_next_bit((void *)(addr), (size), (bit) + 1))
+#endif
+
+#define _BUG_ON_SLINK_NR(l, slink_nr) \
+ do { \
+ BUG_ON(slink_nr < 0); \
+ } while (0);
+
+/* Replicator log metadata version. */
+struct repl_log_version {
+ unsigned major;
+ unsigned minor;
+ unsigned subminor;
+};
+
+/*
+ * Each version of the log code may get a separate source module, so
+ * we store the version information in the .c file.
+ */
+#define DM_REPL_LOG_MAJOR 0
+#define DM_REPL_LOG_MINOR 0
+#define DM_REPL_LOG_MICRO 1
+
+#define DM_REPL_LOG_VERSION \
+ { DM_REPL_LOG_MAJOR, \
+ DM_REPL_LOG_MINOR, \
+ DM_REPL_LOG_MICRO, }
+
+struct version {
+ uint16_t major;
+ uint16_t minor;
+ uint16_t subminor;
+} my_version = DM_REPL_LOG_VERSION;
+
+/* 1 */
+/* Shall be 16 bytes long */
+static const char log_header_magic[] = "dm-replicatorHJM";
+#define MAGIC_LEN (sizeof(log_header_magic) - 1)
+#define HANDLER_LEN MAGIC_LEN
+
+/* Header format on disk */
+struct log_header_disk {
+ uint8_t magic[MAGIC_LEN];
+ uint32_t crc;
+ struct version version;
+ uint64_t size;
+ uint64_t buffer_header; /* sector of first
+ * buffer_header_disk */
+ uint8_t handler_name[HANDLER_LEN];
+ /* Free space. */
+} __attribute__((__packed__));
+
+/* Macros for bitmap access. */
+#define BITMAP_SIZE(l) ((l)->slink.bitmap_size)
+#define BITMAP_ELEMS(l) ((l)->slink.bitmap_elems)
+#define BITMAP_ELEMS_MAX 32
+
+/* Header format in core (only one of these per log device). */
+struct log_header {
+ struct repl_log_version version;
+ sector_t size;
+ sector_t buffer_header;
+
+ /* Bitarray of configured slinks to copy accross and those to I/O to. */
+ struct {
+ uint64_t slinks[BITMAP_ELEMS_MAX];
+ uint64_t ios[BITMAP_ELEMS_MAX];
+ uint64_t set_accessible[BITMAP_ELEMS_MAX];
+ uint64_t inaccessible[BITMAP_ELEMS_MAX];
+ } slink_bits;
+};
+#define LOG_SLINKS(l) ((void *) (l)->header.log->slink_bits.slinks)
+#define LOG_SLINKS_IO(l) ((void *) (l)->header.log->slink_bits.ios)
+#define LOG_SLINKS_INACCESSIBLE(l) \
+ ((void *)(l)->header.log->slink_bits.inaccessible)
+#define LOG_SLINKS_SET_ACCESSIBLE(l) \
+ ((void *)(l)->header.log->slink_bits.set_accessible)
+
+static void
+log_header_to_disk(unsigned slinks, void *d_ptr, void *c_ptr)
+{
+ struct log_header_disk *d = d_ptr;
+ struct log_header *c = c_ptr;
+
+ strncpy((char *) d->magic, log_header_magic, MAGIC_LEN);
+ strncpy((char *) d->handler_name,
+ ringbuffer_type.type.name, HANDLER_LEN);
+ d->version.major = cpu_to_le16(c->version.major);
+ d->version.minor = cpu_to_le16(c->version.minor);
+ d->version.subminor = cpu_to_le16(c->version.subminor);
+ d->size = cpu_to_le64(c->size);
+ d->buffer_header = cpu_to_le64(c->buffer_header);
+ d->crc = 0;
+ d->crc = crc32(~0, d, sizeof(d));
+}
+
+static int
+log_header_to_core(unsigned slinks, void *c_ptr, void *d_ptr)
+{
+ int r;
+ uint32_t crc;
+ struct log_header *c = c_ptr;
+ struct log_header_disk *d = d_ptr;
+
+ r = strncmp((char *) d->magic, log_header_magic, MAGIC_LEN);
+ if (r)
+ return -EINVAL;
+
+ /* Check, if acceptible to this replication log handler. */
+ r = strncmp((char *) d->handler_name, ringbuffer_type.type.name,
+ HANDLER_LEN);
+ if (r)
+ return -EINVAL;
+
+ c->version.major = le16_to_cpu(d->version.major);
+ c->version.minor = le16_to_cpu(d->version.minor);
+ c->version.subminor = le16_to_cpu(d->version.subminor);
+ c->size = le64_to_cpu(d->size);
+ c->buffer_header = le64_to_cpu(d->buffer_header);
+ crc = d->crc;
+ d->crc = 0;
+ return (crc == crc32(~0, d, sizeof(d))) ? 0 : -EINVAL;
+}
+
+/* 1a */
+static const char *buffer_header_magic = "dm-replbufferHJM";
+
+/*
+ * meta-data for the ring buffer, one per replog:
+ *
+ * start: location on disk
+ * head: ring buffer head, first data item to be replicated
+ * tail: points to one after the last data item to be replicated
+ *
+ * The ring buffer is full of data_header(_disk) entries.
+ */
+struct buffer_header_disk {
+ uint8_t magic[MAGIC_LEN];
+ uint32_t crc;
+ struct buffer_disk {
+ uint64_t start;
+ uint64_t head;
+ uint64_t tail;
+ } buffer;
+
+ uint64_t flags;
+ /* Free space. */
+} __attribute__((__packed__));
+
+/*
+ * In-core format of the buffer_header_disk structure
+ *
+ * start, head, and tail are as described above for buffer_header_disk.
+ *
+ * next_avail points to the next available sector for placing a log entry.
+ * It is important to distinguish this from tail, as we can issue I/O to
+ * multiple log entries at a time.
+ *
+ * end is the end sector of the log device
+ *
+ * len is the total length of the log device, handy to keep around for maths
+ * free represents the amount of free space in the log. This number
+ * reflects the free space in the log given the current outstanding I/O's.
+ * In other words, it is the distance between next_avail and head.
+ */
+/*
+ * My guess is that this should be subsumed by the repl_log structure, as
+ * much of the data is copied from there, anyway. The question is just
+ * how to organize it in a readable and efficient way.
+ */
+/* Ring state flag(s). */
+enum ring_status_type {
+ RING_BLOCKED,
+ RING_BUFFER_ERROR,
+ RING_BUFFER_DATA_ERROR,
+ RING_BUFFER_HEADER_ERROR,
+ RING_BUFFER_HEAD_ERROR,
+ RING_BUFFER_TAIL_ERROR,
+ RING_BUFFER_FULL,
+ RING_BUFFER_IO_QUEUED,
+ RING_SUSPENDED
+};
+
+/*
+ * Pools types for:
+ * o ring buffer entries
+ * o data headers.
+ * o disk data headers.
+ * o slink copy contexts
+ */
+enum ring_pool_type {
+ ENTRY, /* Ring buffer entries. */
+ DATA_HEADER, /* Ring buffer data headers. */
+ DATA_HEADER_DISK, /* Ring buffer ondisk data headers. */
+ COPY_CONTEXT, /* Context for any single slink copy. */
+ NR_RING_POOLS,
+};
+
+struct sector_range {
+ sector_t start;
+ sector_t end;
+} range;
+
+struct ringbuffer {
+ sector_t start; /* Start sector of the log space on disk. */
+ sector_t head; /* Sector of the first log entry. */
+ sector_t tail; /* Sector of the last valid log entry. */
+
+ struct mutex mutex; /* Mutex hold on member updates below. */
+
+ /* The following fields are useful to keep track of in-core state. */
+ sector_t next_avail; /* In-memory tail of the log. */
+ sector_t end; /* 1st sector past end of log device. */
+ sector_t free; /* Free space left in the log. */
+ sector_t pending; /* sectors queued but not allocated */
+
+ struct {
+ unsigned long flags; /* Buffer state flags. */
+ } io;
+
+ /* Dirty sectors for slink0. */
+ struct sector_hash {
+ struct list_head *hash;
+ unsigned buckets;
+ unsigned mask;
+ } busy_sectors;
+
+ /* Waiting for all I/O to be flushed. */
+ wait_queue_head_t flushq;
+ mempool_t *pools[NR_RING_POOLS];
+};
+
+DM_BITOPS(RingBlocked, ringbuffer, RING_BLOCKED)
+DM_BITOPS(RingBufferError, ringbuffer, RING_BUFFER_ERROR)
+DM_BITOPS(RingBufferDataError, ringbuffer, RING_BUFFER_DATA_ERROR)
+DM_BITOPS(RingBufferHeaderError, ringbuffer, RING_BUFFER_HEADER_ERROR)
+DM_BITOPS(RingBufferHeadError, ringbuffer, RING_BUFFER_HEAD_ERROR)
+DM_BITOPS(RingBufferTailError, ringbuffer, RING_BUFFER_TAIL_ERROR)
+DM_BITOPS(RingBufferFull, ringbuffer, RING_BUFFER_FULL)
+DM_BITOPS(RingBufferIOQueued, ringbuffer, RING_BUFFER_IO_QUEUED)
+DM_BITOPS(RingSuspended, ringbuffer, RING_SUSPENDED)
+
+#define CC_POOL_MIN 4
+#define HEADER_POOL_MIN 32
+#define ENTRY_POOL_MIN 32
+
+static void
+buffer_header_to_disk(unsigned slinks, void *d_ptr, void *c_ptr)
+{
+ struct buffer_header_disk *d = d_ptr;
+ struct ringbuffer *c = c_ptr;
+
+ strncpy((char *) d->magic, buffer_header_magic, MAGIC_LEN);
+ d->buffer.start = cpu_to_le64(to_bytes(c->start));
+ d->buffer.head = cpu_to_le64(to_bytes(c->head));
+ d->buffer.tail = cpu_to_le64(to_bytes(c->tail));
+ d->flags = cpu_to_le64(c->io.flags);
+ d->crc = 0;
+ d->crc = crc32(~0, d, sizeof(d));
+}
+
+static int
+buffer_header_to_core(unsigned slinks, void *c_ptr, void *d_ptr)
+{
+ int r;
+ uint32_t crc;
+ struct ringbuffer *c = c_ptr;
+ struct buffer_header_disk *d = d_ptr;
+
+ r = strncmp((char *) d->magic, buffer_header_magic, MAGIC_LEN);
+ if (r)
+ return -EINVAL;
+
+ c->start = to_sector(le64_to_cpu(d->buffer.start));
+ c->head = to_sector(le64_to_cpu(d->buffer.head));
+ c->tail = to_sector(le64_to_cpu(d->buffer.tail));
+ c->io.flags = le64_to_cpu(d->flags);
+ crc = d->crc;
+ d->crc = 0;
+ return likely(crc == crc32(~0, d, sizeof(d))) ? 0 : -EINVAL;
+}
+
+/* 3 */
+/* The requirement is to support devices with 4k sectors. */
+#define HEADER_SECTORS to_sector(4096)
+
+static const char *data_header_magic = "dm-replicdataHJM";
+
+/* FIXME: XXX adjust for larger sector size! */
+#define DATA_HEADER_DISK_SIZE 512
+enum entry_wrap_type { WRAP_NONE, WRAP_DATA, WRAP_NEXT };
+struct data_header_disk {
+ uint8_t magic[MAGIC_LEN];
+ uint32_t crc;
+ uint32_t filler;
+
+ struct {
+ /* Internal namespace to get rid of major/minor. -HJM */
+ uint64_t dev;
+ uint64_t offset;
+ uint64_t size;
+ } region;
+
+ /* Position of header and data on disk in bytes. */
+ struct {
+ uint64_t header; /* Offset of this header */
+ uint64_t data; /* Offset of data (ie. the bio). */
+ } pos;
+
+ uint8_t valid; /* FIXME: XXX this needs to be in memory copy, too */
+ uint8_t wrap; /* Above enum entry_wrap_type. */
+ uint8_t barrier;/* Be prepared for write barrier support. */
+
+ /*
+ * Free space: fill up to offset 256.
+ */
+ uint8_t filler1[189];
+
+ /* Offset 256! */
+ /* Bitmap, bit position set to 0 for uptodate slink */
+ uint64_t slink_bits[BITMAP_ELEMS_MAX];
+
+ /* Free space. */
+} __attribute__((__packed__));
+
+struct data_header {
+ struct list_head list;
+
+ /* Bitmap, bit position set to 0 for uptodate slink. */
+ uint64_t slink_bits[BITMAP_ELEMS_MAX];
+
+ /*
+ * Reference count indicating the number of endios
+ * expected while writing the header and bitmap.
+ */
+ atomic_t cnt;
+
+ struct data_header_region {
+ /* dev, sector, and size are taken from the initial bio. */
+ unsigned long dev;
+ sector_t sector;
+ unsigned size;
+ } region;
+
+ /* Position of header and data on disk and size in sectors. */
+ struct {
+ sector_t header; /* sector of this header on disk */
+ sector_t data; /* Offset of data (ie. the bio). */
+ unsigned data_sectors; /* Useful for sector calculation. */
+ } pos;
+
+ /* Next data or complete entry wraps. */
+ enum entry_wrap_type wrap;
+};
+
+/* Round size in bytes up to multiples of HEADER_SECTORS. */
+enum distance_type { FULL_SECTORS, DATA_SECTORS };
+static inline sector_t
+_roundup_sectors(unsigned sectors, enum distance_type type)
+{
+ return HEADER_SECTORS *
+ (!!(type == FULL_SECTORS) + dm_div_up(sectors, HEADER_SECTORS));
+}
+
+/* Header + data. */
+static inline sector_t
+roundup_sectors(unsigned sectors)
+{
+ return _roundup_sectors(sectors, FULL_SECTORS);
+}
+
+/* Data only. */
+static inline sector_t
+roundup_data_sectors(unsigned sectors)
+{
+ return _roundup_sectors(sectors, DATA_SECTORS);
+}
+
+static void
+data_header_to_disk(unsigned bitmap_elems, void *d_ptr, void *c_ptr)
+{
+ unsigned i = bitmap_elems;
+ struct data_header_disk *d = d_ptr;
+ struct data_header *c = c_ptr;
+
+ BUG_ON(!i);
+
+ strncpy((char *) d->magic, data_header_magic, MAGIC_LEN);
+ d->region.dev = cpu_to_le64(c->region.dev);
+ d->region.offset = cpu_to_le64(to_bytes(c->region.sector));
+ d->region.size = cpu_to_le64(c->region.size);
+
+ /* Xfer bitmap. */
+ while (i--)
+ d->slink_bits[i] = cpu_to_le64(c->slink_bits[i]);
+
+ d->valid = 1;
+ d->wrap = c->wrap;
+ d->pos.header = cpu_to_le64(to_bytes(c->pos.header));
+ d->pos.data = cpu_to_le64(to_bytes(c->pos.data));
+ d->crc = 0;
+ d->crc = crc32(~0, d, sizeof(d));
+}
+
+static int
+data_header_to_core(unsigned bitmap_elems, void *c_ptr, void *d_ptr)
+{
+ int r;
+ unsigned i = bitmap_elems;
+ uint32_t crc;
+ struct data_header *c = c_ptr;
+ struct data_header_disk *d = d_ptr;
+
+ BUG_ON(!i);
+
+ r = strncmp((char *) d->magic, data_header_magic, MAGIC_LEN);
+ if (r)
+ return -EINVAL;
+
+ c->region.dev = le64_to_cpu(d->region.dev);
+ c->region.sector = to_sector(le64_to_cpu(d->region.offset));
+ c->region.size = le64_to_cpu(d->region.size);
+
+ /* Xfer bitmap. */
+ while (i--)
+ c->slink_bits[i] = le64_to_cpu(d->slink_bits[i]);
+
+ c->pos.header = to_sector(le64_to_cpu(d->pos.header));
+ c->pos.data = to_sector(le64_to_cpu(d->pos.data));
+ c->pos.data_sectors = roundup_data_sectors(to_sector(c->region.size));
+ c->wrap = d->wrap;
+
+ if (unlikely(!d->valid) ||
+ !c->region.size)
+ return -EINVAL;
+
+ crc = d->crc;
+ d->crc = 0;
+ return likely(crc == crc32(~0, d, sizeof(d))) ? 0 : -EINVAL;
+}
+
+static inline void
+slink_set_bit(int bit, uint64_t *ptr)
+{
+ set_bit(bit, (unsigned long *)ptr);
+}
+
+static inline void
+slink_clear_bit(int bit, uint64_t *ptr)
+{
+ clear_bit(bit, (unsigned long *)ptr);
+}
+
+static inline int
+slink_test_bit(int bit, uint64_t *ptr)
+{
+ return test_bit(bit, (unsigned long *)ptr);
+}
+
+
+/* entry list types and access macros. */
+enum entry_list_type {
+ E_BUSY_HASH, /* Busys entries hash. */
+ E_COPY_CONTEXT, /* Copyies accross slinks in progress for entry. */
+ E_ORDERED, /* Ordered for advancing the ring buffer head. */
+ E_WRITE_OR_COPY,/* Add to l->lists.l[L_ENTRY_RING_WRITE/L_SLINK_COPY] */
+ E_NR_LISTS,
+};
+#define E_BUSY_HASH_LIST(entry) (entry->lists.l + E_BUSY_HASH)
+#define E_COPY_CONTEXT_LIST(entry) (entry->lists.l + E_COPY_CONTEXT)
+#define E_ORDERED_LIST(entry) (entry->lists.l + E_ORDERED)
+#define E_WRITE_OR_COPY_LIST(entry) (entry->lists.l + E_WRITE_OR_COPY)
+
+/*
+ * Container for the data_header and the associated data pages.
+ */
+struct ringbuffer_entry {
+ struct {
+ struct list_head l[E_NR_LISTS];
+ } lists;
+
+ struct ringbuffer *ring; /* Back pointer. */
+
+ /* Reference count. */
+ atomic_t ref;
+
+ /*
+ * Reference count indicating the number of endios expected
+ * while writing its header and data to the ring buffer log
+ * -or- future use:
+ * how many copies accross site links are active and how many
+ * reads are being sattisfied from the entry.
+ */
+ atomic_t endios;
+
+ struct entry_data {
+ struct data_header *header;
+ struct data_header_disk *disk_header;
+ struct {
+ unsigned long data;
+ unsigned long header;
+ } error;
+ } data;
+
+ struct {
+ struct bio *read; /* bio to read. */
+ struct bio *write; /* Original bio to write. */
+ } bios;
+
+ struct {
+ /* Bitmask of slinks the entry has active copies accross. */
+ uint64_t ios[BITMAP_ELEMS_MAX];
+ /* Bitmask of synchronuous slinks for endio. */
+ /* FIXME: drop in favour of slink inquiry of sync state ? */
+ uint64_t sync[BITMAP_ELEMS_MAX];
+ /* Bitmask of slinks with errors. */
+ uint64_t error[BITMAP_ELEMS_MAX];
+ } slink_bits;
+};
+#define ENTRY_SLINKS(l) ((void *) (entry)->data.header->slink_bits)
+#define ENTRY_IOS(entry) ((void *) (entry)->slink_bits.ios)
+#define ENTRY_SYNC(entry) ((entry)->slink_bits.sync)
+#define ENTRY_ERROR(entry) ((entry)->slink_bits.error)
+
+/* FIXME: XXX
+ * For now, the copy context has a backpointer to the ring buffer entry.
+ * This means that a ring buffer entry has to remain in memory until all
+ * of the slink copies have finished. Heinz, you mentioned that this was
+ * not a good idea. I'm open to suggestions on how better to organize this.
+ */
+enum error_type { ERR_DISK, ERR_RAM, NR_ERR_TYPES };
+struct slink_copy_error {
+ int read;
+ int write;
+};
+
+struct slink_copy_context {
+ /*
+ * List first points to the copy context list in the ring buffer
+ * entry. Then, upon completion it gets moved to the slink endios
+ * list.
+ */
+ struct list_head list;
+ atomic_t cnt;
+ struct ringbuffer_entry *entry;
+ struct dm_repl_slink *slink;
+ struct slink_copy_error error[NR_ERR_TYPES];
+ unsigned long start_jiffies;
+};
+
+/* Development statistics. */
+struct stats {
+ atomic_t io[2];
+ atomic_t writes_pending;
+ atomic_t hash_elem;
+
+ unsigned copy[2];
+ unsigned wrap;
+ unsigned hash_insert;
+ unsigned hash_insert_max;
+ unsigned stall;
+};
+
+/* Per site link measure/state. */
+enum slink_status_type {
+ SS_SYNC, /* slink fell behind an I/O threshold. */
+ SS_TEARDOWN, /* Flag site link teardown. */
+};
+struct slink_state {
+ unsigned slink_nr;
+ struct repl_log *l;
+
+ struct {
+
+ /*
+ * Difference of time (measured in jiffies) between the
+ * first outstanding copy for this slink and the last
+ * outstanding copy.
+ */
+ unsigned long head_jiffies;
+
+ /* Number of ios/sectors currently copy() to this slink. */
+ struct {
+ sector_t sectors;
+ uint64_t ios;
+ } outstanding;
+ } fb;
+
+ struct {
+ unsigned long flags; /* slink_state flags._*/
+
+ /* slink+I/O teardown synchronization. */
+ wait_queue_head_t waiters;
+ atomic_t in_flight;
+ } io;
+};
+DM_BITOPS(SsSync, slink_state, SS_SYNC)
+DM_BITOPS(SsTeardown, slink_state, SS_TEARDOWN)
+
+enum open_type { OT_AUTO, OT_OPEN, OT_CREATE };
+enum replog_status_type {
+ LOG_DEVEL_STATS, /* Turn on development stats. */
+ LOG_INITIALIZED, /* Log initialization finished. */
+ LOG_RESIZE, /* Log resize requested. */
+};
+
+/* repl_log list types and access macros. */
+enum replog_list_type {
+ L_REPLOG, /* Linked list of replogs. */
+ L_SLINK_COPY, /* Entries to copy accross slinks. */
+ L_SLINK_ENDIO, /* Entries to endio process. */
+ L_ENTRY_RING_WRITE, /* Entries to write to ring buffer */
+ L_ENTRY_ORDERED, /* Ordered list of entries (write fidelity). */
+ L_NR_LISTS,
+};
+#define L_REPLOG_LIST(l) (l->lists.l + L_REPLOG)
+#define L_SLINK_COPY_LIST(l) (l->lists.l + L_SLINK_COPY)
+#define L_SLINK_ENDIO_LIST(l) (l->lists.l + L_SLINK_ENDIO)
+#define L_ENTRY_RING_WRITE_LIST(l) (l->lists.l + L_ENTRY_RING_WRITE)
+#define L_ENTRY_ORDERED_LIST(l) (l->lists.l + L_ENTRY_ORDERED)
+
+/* The replication log in core. */
+struct repl_log {
+ struct dm_repl_log *log;
+
+ struct kref ref; /* Pin count. */
+
+ struct dm_repl_log *replog;
+ struct dm_repl_slink *slink0;
+
+ struct stats stats; /* Development statistics. */
+
+ struct repl_params {
+ enum open_type open_type;
+ unsigned count;
+ struct repl_dev {
+ struct dm_dev *dm_dev;
+ sector_t start;
+ sector_t size;
+ } dev;
+ } params;
+
+ struct {
+ spinlock_t lock; /* Lock on pending list below. */
+ struct bio_list in; /* pending list of bios */
+ struct dm_io_client *io_client;
+ struct workqueue_struct *wq;
+ struct work_struct ws;
+ unsigned long flags; /* State flags. */
+ /* Preallocated header. We only need one at a time.*/
+ struct buffer_header_disk *buffer_header_disk;
+ } io;
+
+ struct ringbuffer ringbuffer;
+
+ /* Useful for runtime performance on bitmap accesses. */
+ struct {
+ int count; /* Actual # of slinks in this replog. */
+ unsigned max; /* Actual maximum added site link #. */
+ unsigned bitmap_elems; /* Actual used elements in bitmaps. */
+ unsigned bitmap_size; /* Actual bitmap size (for memcpy). */
+ } slink;
+
+ struct {
+ struct log_header *log;
+ } header;
+
+ struct {
+ /* List of site links. */
+ struct dm_repl_log_slink_list slinks;
+
+ /*
+ * A single lock for all of these lists should be sufficient
+ * given that each list is processed in-turn (see do_log()).
+ *
+ * The lock has to protect the L_SLINK_ENDIO list
+ * and the entry ring write lists below.
+ *
+ * We got to streamline these lists vs. the lock. -HJM
+ * The others are accessed by one thread only. -HJM
+ */
+ rwlock_t lock;
+
+ /*
+ * Lists for entry slink copies, entry endios,
+ * ring buffer writes and ordered entries.
+ */
+ struct list_head l[L_NR_LISTS];
+ } lists;
+
+ /* Caller callback function and context. */
+ struct replog_notify {
+ dm_repl_notify_fn fn;
+ void *context;
+ } notify;
+};
+
+#define _SET_AND_BUG_ON_L(l, log) \
+ do { \
+ _BUG_ON_PTR(log); \
+ (l) = (log)->context; \
+ _BUG_ON_PTR(l); \
+ } while (0);
+
+/* Define log bitops. */
+DM_BITOPS(LogDevelStats, repl_log, LOG_DEVEL_STATS);
+DM_BITOPS(LogInitialized, repl_log, LOG_INITIALIZED);
+DM_BITOPS(LogResize, repl_log, LOG_RESIZE);
+
+static inline struct repl_log *
+ringbuffer_repl_log(struct ringbuffer *ring)
+{
+ return container_of(ring, struct repl_log, ringbuffer);
+}
+
+static inline struct block_device *
+repl_log_bdev(struct repl_log *l)
+{
+ return l->params.dev.dm_dev->bdev;
+}
+
+static inline struct block_device *
+ringbuffer_bdev(struct ringbuffer *ring)
+{
+ return repl_log_bdev(ringbuffer_repl_log(ring));
+}
+
+/* Check MAX_SLINKS bit array for busy bits. */
+static inline int
+entry_busy(struct repl_log *l, void *bits)
+{
+ return find_first_bit(bits, l->slink.max) < l->slink.max;
+}
+
+static inline int
+entry_endios_pending(struct ringbuffer_entry *entry)
+{
+ return entry_busy(ringbuffer_repl_log(entry->ring), ENTRY_IOS(entry));
+}
+
+static inline int
+ss_io(struct slink_state *ss)
+{
+ _BUG_ON_PTR(ss);
+ return atomic_read(&ss->io.in_flight);
+}
+
+static void
+ss_io_get(struct slink_state *ss)
+{
+ BUG_ON(!ss || IS_ERR(ss));
+ atomic_inc(&ss->io.in_flight);
+}
+
+static void
+ss_io_put(struct slink_state *ss)
+{
+ _BUG_ON_PTR(ss);
+ if (atomic_dec_and_test((&ss->io.in_flight)))
+ wake_up(&ss->io.waiters);
+ else
+ BUG_ON(ss_io(ss) < 0);
+}
+
+static void
+ss_wait_on_io(struct slink_state *ss)
+{
+ _BUG_ON_PTR(ss);
+ while (ss_io(ss)) {
+ flush_workqueue(ss->l->io.wq);
+ wait_event(ss->io.waiters, !ss_io(ss));
+ }
+}
+
+/* Wait for I/O to finish on all site links. */
+static inline void
+ss_all_wait_on_ios(struct repl_log *l)
+{
+ unsigned long slink_nr;
+
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+ struct dm_repl_slink *slink =
+ l->slink0->ops->slink(l->replog, slink_nr);
+ struct slink_state *ss;
+
+ if (IS_ERR(slink)) {
+ DMERR_LIMIT("%s slink error", __func__);
+ continue;
+ }
+
+ ss = slink->caller;
+ _BUG_ON_PTR(ss);
+ ss_wait_on_io(ss);
+ }
+}
+
+static inline struct dm_io_client *
+replog_io_client(struct repl_log *l)
+{
+ return l->io.io_client;
+}
+
+static inline struct repl_log *
+dev_repl_log(struct repl_dev *dev)
+{
+ return container_of(dev, struct repl_log, params.dev);
+}
+
+/* Define mempool_{alloc,free}() functions for the ring buffer pools. */
+#define ALLOC_FREE_ELEM(name, type) \
+static void *\
+alloc_ ## name(struct ringbuffer *ring) \
+{ \
+ return mempool_alloc(ring->pools[(type)], GFP_KERNEL); \
+} \
+\
+static inline void \
+free_ ## name(void *ptr, struct ringbuffer *ring) \
+{ \
+ _BUG_ON_PTR(ptr); \
+ mempool_free(ptr, ring->pools[(type)]); \
+}
+
+ALLOC_FREE_ELEM(entry, ENTRY)
+ALLOC_FREE_ELEM(header, DATA_HEADER)
+ALLOC_FREE_ELEM(data_header_disk, DATA_HEADER_DISK)
+ALLOC_FREE_ELEM(copy_context, COPY_CONTEXT)
+#undef ALLOC_FREE_ELEM
+
+/* Additional alloc/free functions for header_io() abstraction. */
+/* No need to allocate bitmaps, because they are transient. */
+static void *
+alloc_log_header_disk(struct ringbuffer *ring)
+{
+ return kmalloc(to_bytes(1), GFP_KERNEL);
+}
+
+static void
+free_log_header_disk(void *ptr, struct ringbuffer *ring)
+{
+ kfree(ptr);
+}
+
+/* Dummies to allow for abstraction. */
+static void *
+alloc_buffer_header_disk(struct ringbuffer *ring)
+{
+ return ringbuffer_repl_log(ring)->io.buffer_header_disk;
+}
+
+static void
+free_buffer_header_disk(void *ptr, struct ringbuffer *ring)
+{
+}
+
+/*********************************************************************
+ * Busys entries hash.
+ */
+/* Initialize/destroy sector hash. */
+static int
+sector_hash_init(struct sector_hash *hash, sector_t size)
+{
+ unsigned buckets = roundup_pow_of_two(size / BIO_MAX_SECTORS);
+
+ if (buckets > 4) {
+ if (buckets > REGIONS_MAX)
+ buckets = REGIONS_MAX;
+
+ buckets /= 4;
+ }
+
+ /* Allocate stripe hash. */
+ hash->hash = vmalloc(buckets * sizeof(*hash->hash));
+ if (!hash->hash)
+ return -ENOMEM;
+
+ hash->buckets = hash->mask = buckets;
+ hash->mask--;
+
+ /* Initialize buckets. */
+ while (buckets--)
+ INIT_LIST_HEAD(hash->hash + buckets);
+
+ return 0;
+}
+
+static void
+sector_hash_exit(struct sector_hash *hash)
+{
+ if (hash->hash) {
+ vfree(hash->hash);
+ hash->hash = NULL;
+ }
+}
+
+/* Hash function. */
+static inline struct list_head *
+hash_bucket(struct sector_hash *hash, sector_t sector)
+{
+ sector_div(sector, BIO_MAX_SECTORS);
+ return hash->hash + (unsigned) (sector & hash->mask);
+}
+
+/* Insert an entry into a sector hash. */
+static inline void
+sector_hash_elem_insert(struct sector_hash *hash,
+ struct ringbuffer_entry *entry)
+{
+ struct repl_log *l;
+ struct stats *s;
+ struct list_head *bucket =
+ hash_bucket(hash, entry->data.header->region.sector);
+
+ BUG_ON(!bucket);
+ _BUG_ON_PTR(entry->ring);
+ l = ringbuffer_repl_log(entry->ring);
+ s = &l->stats;
+
+ BUG_ON(!list_empty(E_BUSY_HASH_LIST(entry)));
+ list_add_tail(E_BUSY_HASH_LIST(entry), bucket);
+
+ atomic_inc(&s->hash_elem);
+ if (++s->hash_insert > s->hash_insert_max)
+ s->hash_insert_max = s->hash_insert;
+}
+
+/* Return first sector # of bio. */
+static inline sector_t
+bio_begin(struct bio *bio)
+{
+ return bio->bi_sector;
+}
+
+/* Return last sector # of bio. */
+static inline sector_t bio_end(struct bio *bio)
+{
+ return bio_begin(bio) + bio_sectors(bio);
+}
+
+/* Roundup size to sectors. */
+static inline sector_t round_up_to_sector(unsigned size)
+{
+ return to_sector(dm_round_up(size, to_bytes(1)));
+}
+
+/* Check if a bio and a range overlap. */
+static inline int
+_ranges_overlap(struct sector_range *r1, struct sector_range *r2)
+{
+ return r1->start >= r2->start &&
+ r1->start < r2->end;
+}
+
+static inline int
+ranges_overlap(struct sector_range *elem_range, struct sector_range *bio_range)
+{
+ return _ranges_overlap(elem_range, bio_range) ||
+ _ranges_overlap(bio_range, elem_range);
+}
+
+/* Take an entry ref reference out. */
+static inline void
+entry_get(struct ringbuffer_entry *entry)
+{
+ atomic_inc(&entry->ref);
+}
+
+/*
+ * Check if bio's address range has writes pending.
+ *
+ * Must be called with the read hash lock held.
+ */
+static int
+ringbuffer_writes_pending(struct sector_hash *hash, struct bio *bio,
+ struct list_head *buckets[2])
+{
+ int r = 0;
+ unsigned end, i;
+ struct ringbuffer_entry *entry;
+ /* Setup a range for the bio. */
+ struct sector_range bio_range = {
+ .start = bio_begin(bio),
+ .end = bio_end(bio),
+ }, entry_range;
+
+ buckets[0] = hash_bucket(hash, bio_range.start);
+ buckets[1] = hash_bucket(hash, bio_range.end);
+ if (buckets[0] == buckets[1]) {
+ end = 1;
+ buckets[1] = NULL;
+ } else
+ end = 2;
+
+ for (i = 0; i < end; i++) {
+ /* Walk the entries checking for overlaps. */
+ list_for_each_entry_reverse(entry, buckets[i],
+ lists.l[E_BUSY_HASH]) {
+ entry_range.start = entry->data.header->region.sector;
+ entry_range.end = entry_range.start +
+ round_up_to_sector(entry->data.header->region.size);
+
+ if (ranges_overlap(&entry_range, &bio_range))
+ return atomic_read(&entry->endios) ? -EBUSY : 1;
+ }
+ }
+
+ return r;
+}
+
+/* Clear a sector range busy. */
+static void
+entry_put(struct ringbuffer_entry *entry)
+{
+ _BUG_ON_PTR(entry);
+
+ if (atomic_dec_and_test(&entry->ref)) {
+ struct ringbuffer *ring = entry->ring;
+ struct stats *s;
+ struct repl_log *l;
+
+ _BUG_ON_PTR(ring);
+ l = ringbuffer_repl_log(ring);
+ s = &l->stats;
+
+ /*
+ * We don't need locking here because the last
+ * put is carried out in daemon context.
+ */
+ BUG_ON(list_empty(E_BUSY_HASH_LIST(entry)));
+ list_del_init(E_BUSY_HASH_LIST(entry));
+
+ atomic_dec(&s->hash_elem);
+ s->hash_insert--;
+ } else
+ BUG_ON(atomic_read(&entry->ref) < 0);
+}
+
+static inline void
+sector_range_clear_busy(struct ringbuffer_entry *entry)
+{
+ entry_put(entry);
+}
+
+/*
+ * Mark a sector range start and length busy.
+ *
+ * Caller has to serialize calls.
+ */
+static void
+sector_range_mark_busy(struct ringbuffer_entry *entry)
+{
+ _BUG_ON_PTR(entry);
+ entry_get(entry);
+
+ /* Insert new element into hash. */
+ sector_hash_elem_insert(&entry->ring->busy_sectors, entry);
+}
+
+static void
+stats_init(struct repl_log *l)
+{
+ unsigned i = 2;
+ struct stats *s = &l->stats;
+
+ memset(s, 0, sizeof(*s));
+
+ while (i--)
+ atomic_set(s->io + i, 0);
+
+ atomic_set(&s->writes_pending, 0);
+ atomic_set(&s->hash_elem, 0);
+}
+
+/* Global replicator log list. */
+LIST_HEAD(replog_list);
+
+/* Wake worker. */
+static void
+wake_do_log(struct repl_log *l)
+{
+ queue_work(l->io.wq, &l->io.ws);
+}
+
+struct dm_repl_slink *
+slink_find(struct repl_log *l, int slink_nr)
+{
+ struct dm_repl_slink *slink0 = l->slink0;
+
+ if (!slink0)
+ return ERR_PTR(-ENOENT);
+
+ _BUG_ON_SLINK_NR(l, slink_nr);
+ return slink_nr ? slink0->ops->slink(l->replog, slink_nr) : slink0;
+}
+
+/*
+ * If an slink is asynchronous, check to see if it needs to fall
+ * back to synchronous mode due to falling too far behind.
+ *
+ * Declare a bunch of fallbehind specific small functions in order
+ * to avoid conditions in the fast path by accessing them via
+ * function pointers.
+ */
+/* True if slink exceeds fallbehind threshold. */
+static int
+slink_fallbehind_exceeded(struct repl_log *l, struct slink_state *ss,
+ struct dm_repl_slink_fallbehind *fallbehind,
+ unsigned amount)
+{
+ sector_t *sectors;
+ uint64_t *ios;
+ unsigned long *head_jiffies;
+
+ _BUG_ON_PTR(l);
+ _BUG_ON_PTR(ss);
+ _BUG_ON_PTR(fallbehind);
+ ios = &ss->fb.outstanding.ios;
+ sectors = &ss->fb.outstanding.sectors;
+
+ spin_lock(&l->io.lock);
+ (*ios)++;
+ (*sectors) += amount;
+ spin_unlock(&l->io.lock);
+
+ if (!fallbehind->value)
+ return 0;
+
+ switch (fallbehind->type) {
+ case DM_REPL_SLINK_FB_IOS:
+ return *ios > fallbehind->value;
+
+ case DM_REPL_SLINK_FB_SIZE:
+ return *sectors > fallbehind->value;
+
+ case DM_REPL_SLINK_FB_TIMEOUT:
+ head_jiffies = &ss->fb.head_jiffies;
+ if (unlikely(!*head_jiffies))
+ *head_jiffies = jiffies;
+
+ return time_after(jiffies, *head_jiffies +
+ msecs_to_jiffies(fallbehind->value));
+
+ default:
+ BUG();
+ }
+
+ return 0;
+}
+
+/*
+ * True if slink falls below fallbehind threshold.
+ *
+ * Can be called from interrupt context.
+ */
+static int
+slink_fallbehind_recovered(struct repl_log *l, struct slink_state *ss,
+ struct dm_repl_slink_fallbehind *fallbehind,
+ unsigned amount)
+{
+ sector_t *sectors;
+ uint64_t *ios;
+
+ _BUG_ON_PTR(ss);
+ _BUG_ON_PTR(fallbehind);
+ ios = &ss->fb.outstanding.ios;
+ sectors = &ss->fb.outstanding.sectors;
+
+ /* Need the non-irq versions here, because IRQs are already disabled. */
+ spin_lock(&l->io.lock);
+ (*ios)--;
+ (*sectors) -= amount;
+ spin_unlock(&l->io.lock);
+
+ if (!fallbehind->value)
+ return 0;
+
+ switch (fallbehind->type) {
+ case DM_REPL_SLINK_FB_IOS:
+ return *ios <= fallbehind->value;
+
+ case DM_REPL_SLINK_FB_SIZE:
+ return *sectors <= fallbehind->value;
+
+ case DM_REPL_SLINK_FB_TIMEOUT:
+ return time_before(jiffies, ss->fb.head_jiffies +
+ msecs_to_jiffies(fallbehind->value));
+ default:
+ BUG();
+ }
+
+ return 0;
+}
+
+/*
+ * Update fallbehind account.
+ *
+ * Has to be called with rw lock held.
+ */
+/* FIXME: account for resynchronization. */
+enum fb_update_type { UPD_INC, UPD_DEC };
+static void
+slink_fallbehind_update(enum fb_update_type type,
+ struct dm_repl_slink *slink,
+ struct ringbuffer_entry *entry)
+{
+ int slink_nr, sync;
+ struct repl_log *l;
+ struct slink_state *ss;
+ struct data_header_region *region;
+ struct dm_repl_slink_fallbehind *fallbehind;
+ struct ringbuffer_entry *pos;
+
+ _BUG_ON_PTR(slink);
+ fallbehind = slink->ops->fallbehind(slink);
+ _BUG_ON_PTR(fallbehind);
+ _BUG_ON_PTR(entry);
+ l = ringbuffer_repl_log(entry->ring);
+ _BUG_ON_PTR(l);
+ slink_nr = slink->ops->slink_number(slink);
+ _BUG_ON_SLINK_NR(l, slink_nr);
+ region = &entry->data.header->region;
+ _BUG_ON_PTR(region);
+
+ /*
+ * We can access ss w/o a lock, because it's referenced by
+ * inflight I/Os and by the running worker which processes
+ * this function.
+ */
+ ss = slink->caller;
+ if (!ss)
+ return;
+
+ _BUG_ON_PTR(ss);
+ sync = SsSync(ss);
+
+ switch (type) {
+ case UPD_INC:
+ if (slink_fallbehind_exceeded(l, ss, fallbehind,
+ region->size) &&
+ !TestSetSsSync(ss) &&
+ !sync)
+ DMINFO("enforcing fallbehind sync on slink=%d at %u",
+ slink_nr, jiffies_to_msecs(jiffies));
+ break;
+
+ case UPD_DEC:
+ /*
+ * Walk the list of outstanding copy I/Os and update the
+ * start_jiffies value with the first entry found.
+ */
+ list_for_each_entry(pos, L_SLINK_COPY_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ struct slink_copy_context *cc;
+
+ list_for_each_entry(cc, E_COPY_CONTEXT_LIST(pos),
+ list) {
+ if (cc->slink->ops->slink_number(cc->slink) ==
+ slink_nr) {
+ ss->fb.head_jiffies = cc->start_jiffies;
+ break;
+ }
+ }
+ }
+
+ if (slink_fallbehind_recovered(l, ss, fallbehind,
+ region->size)) {
+ ss->fb.head_jiffies = 0;
+
+ if (TestClearSsSync(ss) && sync) {
+ DMINFO("releasing fallbehind sync on slink=%d"
+ " at %u",
+ slink_nr, jiffies_to_msecs(jiffies));
+ wake_do_log(l);
+ }
+ }
+
+ break;
+
+ default:
+ BUG();
+ }
+}
+
+static inline void
+slink_fallbehind_inc(struct dm_repl_slink *slink,
+ struct ringbuffer_entry *entry)
+{
+ slink_fallbehind_update(UPD_INC, slink, entry);
+}
+
+static inline void
+slink_fallbehind_dec(struct dm_repl_slink *slink,
+ struct ringbuffer_entry *entry)
+{
+ slink_fallbehind_update(UPD_DEC, slink, entry);
+}
+
+/* Caller properties definition for dev_io(). */
+struct dev_io_params {
+ struct repl_dev *dev;
+ sector_t sector;
+ unsigned size;
+ struct dm_io_memory mem;
+ struct dm_io_notify notify;
+ unsigned long flags;
+};
+
+/*
+ * Read/write device items.
+ *
+ * In case of dio->fn, an asynchronous dm_io()
+ * call will be performed, else synchronous.
+ */
+static int
+dev_io(int rw, struct ringbuffer *ring, struct dev_io_params *dio)
+{
+ BUG_ON(dio->size > BIO_MAX_SIZE);
+ DMDEBUG_LIMIT("%s: rw: %d, %u sectors at sector %llu, dev %p",
+ __func__, rw, dio->size,
+ (unsigned long long) dio->sector,
+ dio->dev->dm_dev->bdev);
+
+ /* Flag IO queued on asynchronous calls. */
+ if (dio->notify.fn)
+ SetRingBufferIOQueued(ring);
+
+ return dm_io(
+ &(struct dm_io_request) {
+ .bi_rw = rw,
+ .mem = dio->mem,
+ .notify = dio->notify,
+ .client = replog_io_client(dev_repl_log(dio->dev))
+ }, 1 /* 1 region following */,
+ &(struct dm_io_region) {
+ .bdev = dio->dev->dm_dev->bdev,
+ .sector = dio->sector,
+ .count = round_up_to_sector(dio->size),
+ },
+ NULL
+ );
+}
+
+/* Definition of properties/helper functions for header IO. */
+struct header_io_spec {
+ const char *name; /* Header identifier (eg. 'data'). */
+ unsigned size; /* Size of ondisk structure. */
+ /* Disk structure allocation helper. */
+ void *(*alloc_disk)(struct ringbuffer *);
+ /* Disk structure deallocation helper. */
+ void (*free_disk)(void *, struct ringbuffer *);
+ /* Disk structure to core structure xfer helper. */
+ int (*to_core_fn)(unsigned bitmap_elems, void *, void *);
+ /* Core structure to disk structure xfer helper. */
+ void (*to_disk_fn)(unsigned bitmap_elems, void *, void *);
+};
+/* Macro to initialize type specific header_io_spec structure. */
+#define IO_SPEC(header) \
+ { .name = # header, \
+ .size = sizeof(struct header ## _header_disk), \
+ .alloc_disk = alloc_ ## header ## _header_disk, \
+ .free_disk = free_ ## header ## _header_disk, \
+ .to_core_fn = header ## _header_to_core, \
+ .to_disk_fn = header ## _header_to_disk }
+
+enum header_type { IO_LOG, IO_BUFFER, IO_DATA };
+struct header_io_params {
+ enum header_type type;
+ struct repl_log *l;
+ void *core_header;
+ sector_t sector;
+ void (*disk_header_fn)(void *);
+};
+
+/* Read /write a {log,buffer,data} header to disk. */
+static int
+header_io(int rw, struct header_io_params *hio)
+{
+ int r;
+ struct repl_log *l = hio->l;
+ struct ringbuffer *ring = &l->ringbuffer;
+ /* Specs of all log headers. Must be in 'enum header_type' order! */
+ static const struct header_io_spec io_specs[] = {
+ IO_SPEC(log),
+ IO_SPEC(buffer),
+ IO_SPEC(data),
+ };
+ const struct header_io_spec *io = io_specs + hio->type;
+ void *disk_header = io->alloc_disk(ring);
+ struct dev_io_params dio = {
+ &l->params.dev, hio->sector, io->size,
+ .mem = { DM_IO_KMEM, { .addr = disk_header }, 0 },
+ .notify = { NULL, NULL}
+ };
+
+ BUG_ON(io < io_specs || io >= ARRAY_END(io_specs));
+ BUG_ON(!hio->core_header);
+ BUG_ON(!disk_header);
+ memset(disk_header, 0, io->size);
+
+ if (rw == WRITE) {
+ io->to_disk_fn(BITMAP_ELEMS(l), disk_header, hio->core_header);
+
+ /* If disk header needs special handling before write. */
+ if (hio->disk_header_fn)
+ hio->disk_header_fn(disk_header);
+ }
+
+ r = dev_io(rw, ring, &dio);
+ if (unlikely(r)) {
+ SetRingBufferError(ring);
+ DMERR("Failed to %s %s header!",
+ rw == WRITE ? "write" : "read", io->name);
+ } else if (rw == READ) {
+ r = io->to_core_fn(BITMAP_ELEMS(l), hio->core_header,
+ disk_header);
+ if (unlikely(r))
+ DMERR("invalid %s header/sector=%llu",
+ io->name, (unsigned long long) hio->sector);
+ }
+
+ io->free_disk(disk_header, ring);
+ return r;
+}
+
+/* Read/write the log header synchronously. */
+static inline int
+log_header_io(int rw, struct repl_log *l)
+{
+ return header_io(rw, &(struct header_io_params) {
+ IO_LOG, l, l->header.log, l->params.dev.start, NULL });
+}
+
+/* Read/write the ring buffer header synchronously. */
+static inline int
+buffer_header_io(int rw, struct repl_log *l)
+{
+ return header_io(rw, &(struct header_io_params) {
+ IO_BUFFER, l, &l->ringbuffer,
+ l->header.log->buffer_header, NULL });
+}
+
+/* Read/write a data header to/from the ring buffer synchronously. */
+static inline int
+data_header_io(int rw, struct repl_log *l,
+ struct data_header *header, sector_t sector)
+{
+ return header_io(rw, &(struct header_io_params) {
+ IO_DATA, l, header, sector, NULL });
+}
+
+/* Notify dm-repl.c to submit more IO. */
+static void
+notify_caller(struct repl_log *l, int rw, int error)
+{
+ struct replog_notify notify;
+
+ _BUG_ON_PTR(l);
+
+ spin_lock(&l->io.lock);
+ notify = l->notify;
+ spin_unlock(&l->io.lock);
+
+ if (likely(notify.fn)) {
+ if (rw == READ)
+ notify.fn(error, 0, notify.context);
+ else
+ notify.fn(0, error, notify.context);
+ }
+}
+
+/*
+ * Ring buffer routines.
+ *
+ * The ring buffer needs to keep track of arbitrarily-sized data items.
+ * HEAD points to the first data header that needs to be replicated. This
+ * can mean it has been partially replicated or not replicated at all.
+ * The ring buffer is empty if HEAD == TAIL.
+ * The ring buffer is full if HEAD == TAIL + len(TAIL) modulo device size.
+ *
+ * An entry in the buffer is not valid until both the data header and the
+ * associated data items are on disk. Multiple data headers and data items
+ * may be written in parallel. This means that, in addition to the
+ * traditional HEAD and TAIL pointers, we need to keep track of an in-core
+ * variable reflecting the next area in the log that is unallocated. We also
+ * need to keep an ordered list of pending and completd buffer entry writes.
+ */
+/*
+ * Check and wrap a ring buffer offset around ring buffer end.
+ *
+ * There are three cases to distinguish here:
+ * 1. header and data fit before ring->end
+ * 2. header fits before ring->end, data doesn't -> remap data to ring->start
+ * 3. header doesn't fit before ring->end -> remap both to ring->start
+ *
+ * Function returns the next rounded offset *after* any
+ * conditional remapping of the actual header.
+ *
+ */
+static sector_t
+sectors_unused(struct ringbuffer *ring, sector_t first_free)
+{
+ return (ring->end < first_free) ? 0 : ring->end - first_free;
+}
+
+/*
+ * Return the first sector past the end of the header
+ * (i.e. the first data sector).
+ */
+static inline sector_t
+data_start(struct data_header *header)
+{
+ return header->pos.header + HEADER_SECTORS;
+}
+
+/*
+ * Return the first sector past the end of the entry.
+ * (i.e.(the first unused sector).
+ */
+static inline sector_t
+next_start(struct data_header *header)
+{
+ return header->pos.data + header->pos.data_sectors;
+}
+
+static inline sector_t
+next_start_adjust(struct ringbuffer *ring, struct data_header *header)
+{
+ sector_t next_sector = next_start(header);
+
+ return likely(sectors_unused(ring, next_sector) < HEADER_SECTORS) ?
+ ring->start : next_sector;
+}
+
+/* True if entry doesn't wrap. */
+static inline int
+not_wrapped(struct data_header *header)
+{
+ return header->wrap == WRAP_NONE;
+}
+
+/* True if header at ring end and data wrapped to ring start. */
+static inline int
+data_wrapped(struct data_header *header)
+{
+ return header->wrap == WRAP_DATA;
+}
+
+/* True if next entry wraps to ring start. */
+static inline int
+next_entry_wraps(struct data_header *header)
+{
+ return header->wrap == WRAP_NEXT;
+}
+
+/* Return amount of skipped sectors in case of wrapping. */
+static unsigned
+sectors_skipped(struct ringbuffer *ring, struct data_header *header)
+{
+ if (likely(not_wrapped(header)))
+ /* noop */ ;
+ else if (data_wrapped(header))
+ return sectors_unused(ring, data_start(header));
+ else if (next_entry_wraps(header))
+ return sectors_unused(ring, next_start(header));
+
+ return 0;
+}
+
+/* Emmit only once log error messages. */
+static void
+ringbuffer_error(enum ring_status_type type,
+ struct ringbuffer *ring, int error)
+{
+ struct error {
+ enum ring_status_type type;
+ int (*f)(struct ringbuffer *);
+ const char *msg;
+ };
+ static const struct error errors[] = {
+ { RING_BUFFER_DATA_ERROR, TestSetRingBufferDataError, "data" },
+ { RING_BUFFER_HEAD_ERROR, TestSetRingBufferHeadError, "head" },
+ { RING_BUFFER_HEADER_ERROR, TestSetRingBufferHeaderError,
+ "header" },
+ { RING_BUFFER_TAIL_ERROR, TestSetRingBufferTailError, "tail" },
+ };
+ const struct error *e = ARRAY_END(errors);
+
+ while (e-- > errors) {
+ if (type == e->type) {
+ if (!e->f(ring))
+ DMERR("ring buffer %s I/O error %d",
+ e->msg, error);
+
+ return SetRingBufferError(ring);
+ }
+ }
+
+ BUG();
+}
+
+/*
+ * Allocate space for a data item in the ring buffer.
+ *
+ * header->pos is filled in with the sectors for the header and data in
+ * the ring buffer. The free space in the ring buffer is decremented to
+ * account for this entry. The return value is the sector address for the
+ * next data_header_disk.
+ */
+
+/* Increment buffer offset past actual header, optionaly wrapping data. */
+static sector_t
+ringbuffer_inc(struct ringbuffer *ring, struct data_header *header)
+{
+ sector_t sectors;
+
+ /* Initialize the header with the common case */
+ header->pos.header = ring->next_avail;
+ header->pos.data = data_start(header);
+
+ /*
+ * Header doesn't fit before ring->end.
+ *
+ * This can only happen when we are started with an empty ring
+ * buffer that has its tail near the end of the device.
+ */
+ if (unlikely(data_start(header) > ring->end)) {
+ /*
+ * Wrap an entire entry (header + data) to the beginning of
+ * the log device. This will update the ring free sector
+ * count to account for the unused sectors at the end
+ * of the device.
+ */
+ header->pos.header = ring->start;
+ header->pos.data = data_start(header);
+ /* Data doesn't fit before ring->end. */
+ } else if (unlikely(next_start(header) > ring->end)) {
+ /*
+ * Wrap the data portion of a ring buffer entry to the
+ * beginning of the log device. This will update the ring
+ * free sector count to account for the unused sectors at
+ * the end of the device.
+ */
+ header->pos.data = ring->start;
+ header->wrap = WRAP_DATA;
+
+ ringbuffer_repl_log(ring)->stats.wrap++;
+ } else
+ header->wrap = WRAP_NONE;
+
+ sectors = roundup_sectors(header->pos.data_sectors);
+ BUG_ON(sectors > ring->pending);
+ ring->pending -= sectors;
+
+ sectors = next_start_adjust(ring, header);
+ if (sectors == ring->start) {
+ header->wrap = WRAP_NEXT;
+
+ ringbuffer_repl_log(ring)->stats.wrap++;
+ }
+
+ return sectors;
+}
+
+/* Slab and mempool definition. */
+struct cache_defs {
+ const enum ring_pool_type type;
+ const int min;
+ const size_t size;
+ struct kmem_cache *slab_pool;
+ const char *slab_name;
+ const size_t align;
+};
+
+/* Slab and mempool declarations. */
+static struct cache_defs cache_defs[] = {
+ { ENTRY, ENTRY_POOL_MIN, sizeof(struct ringbuffer_entry),
+ NULL, "dm_repl_log_entry", 0 },
+ { DATA_HEADER, HEADER_POOL_MIN, sizeof(struct data_header),
+ NULL, "dm_repl_log_header", 0 },
+ { DATA_HEADER_DISK, HEADER_POOL_MIN, DATA_HEADER_DISK_SIZE,
+ NULL, "dm_repl_log_disk_header", DATA_HEADER_DISK_SIZE },
+ { COPY_CONTEXT, CC_POOL_MIN, sizeof(struct slink_copy_context),
+ NULL, "dm_repl_log_copy", 0 },
+};
+
+/* Destroy all memory pools for a ring buffer. */
+static void
+ringbuffer_exit(struct ringbuffer *ring)
+{
+ mempool_t **pool = ARRAY_END(ring->pools);
+
+ sector_hash_exit(&ring->busy_sectors);
+
+ while (pool-- > ring->pools) {
+ if (likely(*pool)) {
+ mempool_destroy(*pool);
+ *pool = NULL;
+ }
+ }
+}
+
+/* Create all mempools for a ring buffer. */
+static int
+ringbuffer_init(struct ringbuffer *ring)
+{
+ int r;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+ struct cache_defs *pd = ARRAY_END(cache_defs);
+
+
+ mutex_init(&l->ringbuffer.mutex);
+ init_waitqueue_head(&ring->flushq);
+
+ /* Create slab pools. */
+ while (pd-- > cache_defs) {
+ /* Bitmap is not a slab pool. */
+ if (!pd->size)
+ continue;
+
+ ring->pools[pd->type] =
+ mempool_create_slab_pool(pd->min, pd->slab_pool);
+
+ if (unlikely(!ring->pools[pd->type])) {
+ DMERR("Error creating mempool %s", pd->slab_name);
+ goto bad;
+ }
+ }
+
+ /* Initialize busy sector hash. */
+ r = sector_hash_init(&ring->busy_sectors, l->params.dev.size);
+ if (r < 0) {
+ DMERR("Failed to allocate sector busy hash!");
+ goto bad;
+ }
+
+ return 0;
+
+bad:
+ ringbuffer_exit(ring);
+ return -ENOMEM;
+}
+
+/*
+ * Reserve space in the ring buffer for the
+ * given bio data and associated header.
+ *
+ * Correct ring->free by any skipped sectors at the end of the ring buffer.
+ */
+static int
+ringbuffer_reserve_space(struct ringbuffer *ring, struct bio *bio)
+{
+ unsigned nsectors = roundup_sectors(bio_sectors(bio));
+ sector_t end_space, start_sector;
+
+ if (!nsectors)
+ return -EPERM;
+
+ BUG_ON(!mutex_is_locked(&ring->mutex));
+
+ if (unlikely(ring->free < nsectors)) {
+ SetRingBufferFull(ring);
+ return -EBUSY;
+ }
+
+ /*
+ * Account for the sectors that are queued for do_log()
+ * but have not been accounted for on the disk. We need this
+ * calculation to see if any sectors will be lost from our
+ * free pool at the end of ring buffer.
+ */
+ start_sector = ring->next_avail + ring->pending;
+ end_space = sectors_unused(ring, start_sector);
+
+ /* if the whole I/O won't fit before the end of the disk. */
+ if (unlikely(end_space && end_space < nsectors)) {
+ sector_t skipped = end_space >= HEADER_SECTORS ?
+ sectors_unused(ring, start_sector + HEADER_SECTORS) :
+ end_space;
+
+ /* Don't subtract skipped sectors in case the bio won't fit. */
+ if (ring->free - skipped < nsectors)
+ return -EBUSY;
+
+ /*
+ * We subtract the amount of skipped sectors
+ * from ring->free here..
+ *
+ * ringbuffer_advance_head() will add them back on.
+ */
+ ring->free -= skipped;
+ }
+
+ ring->free -= nsectors;
+ ring->pending += nsectors;
+ return 0;
+}
+
+static int
+ringbuffer_empty_nolock(struct ringbuffer *ring)
+{
+ return (ring->head == ring->tail) && !RingBufferFull(ring);
+}
+
+static int
+ringbuffer_empty(struct ringbuffer *ring)
+{
+ int r;
+
+ mutex_lock(&ring->mutex);
+ r = ringbuffer_empty_nolock(ring);
+ mutex_unlock(&ring->mutex);
+
+ return r;
+}
+
+static void
+set_sync_mask(struct repl_log *l, struct ringbuffer_entry *entry)
+{
+ unsigned long slink_nr;
+
+ /* Bitmask of slinks with synchronous I/O completion policy. */
+ for_each_bit(slink_nr, ENTRY_SLINKS(entry), l->slink.max) {
+ struct dm_repl_slink *slink = slink_find(l, slink_nr);
+
+ /* Slink not configured. */
+ if (unlikely(IS_ERR(slink)))
+ continue;
+
+ /* If an slink has fallen behind an I/O threshold, it
+ * must be marked for synchronous I/O completion. */
+ if (slink_synchronous(slink) ||
+ SsSync(slink->caller))
+ slink_set_bit(slink_nr, ENTRY_SYNC(entry));
+ }
+}
+
+/*
+ * Always returns an initialized write entry,
+ * unless fatal memory allocation happens.
+ */
+static struct ringbuffer_entry *
+ringbuffer_alloc_entry(struct ringbuffer *ring, struct bio *bio)
+{
+ int dev_number, i;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+ struct ringbuffer_entry *entry = alloc_entry(ring);
+ struct data_header *header = alloc_header(ring);
+ struct data_header_region *region;
+
+ BUG_ON(!entry);
+ BUG_ON(!header);
+ memset(entry, 0, sizeof(*entry));
+ memset(header, 0, sizeof(*header));
+
+ /* Now setup the ringbuffer_entry. */
+ atomic_set(&entry->endios, 0);
+ atomic_set(&entry->ref, 0);
+ entry->ring = ring;
+ entry->data.header = header;
+ header->wrap = WRAP_NONE;
+
+ i = ARRAY_SIZE(entry->lists.l);
+ while (i--)
+ INIT_LIST_HEAD(entry->lists.l + i);
+
+ /*
+ * In case we're called with a bio, we're creating a new entry
+ * or we're allocating it for reading the header in during init.
+ */
+ if (bio) {
+ struct dm_repl_slink *slink0 = slink_find(l, 0);
+
+ _BUG_ON_PTR(slink0);
+
+ /* Setup the header region. */
+ dev_number = slink0->ops->dev_number(slink0, bio->bi_bdev);
+ BUG_ON(dev_number < 0);
+ region = &header->region;
+ region->dev = dev_number;
+ region->sector = bio_begin(bio);
+ region->size = bio->bi_size;
+ BUG_ON(!region->size);
+ header->pos.data_sectors =
+ roundup_data_sectors(bio_sectors(bio));
+
+ entry->bios.write = bio;
+ sector_range_mark_busy(entry);
+
+ /*
+ * Successfully allocated space in the ring buffer
+ * for this entry. Advance our in-memory tail pointer.
+ * Round up to HEADER_SECTORS boundary for supporting
+ * up to 4k sector sizes.
+ */
+ mutex_lock(&ring->mutex);
+ ring->next_avail = ringbuffer_inc(ring, header);
+ mutex_unlock(&ring->mutex);
+
+ /* Bitmask of slinks to initiate copies accross. */
+ memcpy(ENTRY_SLINKS(entry), LOG_SLINKS(l), BITMAP_SIZE(l));
+
+ /* Set synchronous I/O policy mask. */
+ set_sync_mask(l, entry);
+ }
+
+ /* Add header to the ordered list of headers. */
+ list_add_tail(E_ORDERED_LIST(entry), L_ENTRY_ORDERED_LIST(l));
+
+ DMDEBUG_LIMIT("%s header->pos.header=%llu header->pos.data=%llu "
+ "advancing ring->next_avail=%llu", __func__,
+ (unsigned long long) header->pos.header,
+ (unsigned long long) header->pos.data,
+ (unsigned long long) ring->next_avail);
+ return entry;
+}
+
+/* Free a ring buffer entry and the data header hanging off it. */
+static void
+ringbuffer_free_entry(struct ringbuffer_entry *entry)
+{
+ struct ringbuffer *ring;
+
+ _BUG_ON_PTR(entry);
+ _BUG_ON_PTR(entry->data.header);
+
+ ring = entry->ring;
+ _BUG_ON_PTR(ring);
+
+ /*
+ * Will need to change once ringbuffer_entry is
+ * not kept around as long as the data header.
+ */
+ if (!list_empty(E_BUSY_HASH_LIST(entry))) {
+ DMERR("%s E_BUSY_HAS_LIST not empty!", __func__);
+ BUG();
+ }
+
+ if (!list_empty(E_COPY_CONTEXT_LIST(entry))) {
+ DMERR("%s E_COPY_CONTEXT_LIST not empty!", __func__);
+ BUG();
+ }
+
+ if (!list_empty(E_ORDERED_LIST(entry)))
+ list_del(E_ORDERED_LIST(entry));
+
+ if (!list_empty(E_WRITE_OR_COPY_LIST(entry)))
+ list_del(E_WRITE_OR_COPY_LIST(entry));
+
+ free_header(entry->data.header, ring);
+ free_entry(entry, ring);
+}
+
+/* Mark a ring buffer entry invalid on the backing store device. */
+static void
+disk_header_set_invalid(void *ptr)
+{
+ ((struct data_header_disk *) ptr)->valid = 0;
+}
+
+static int
+ringbuffer_mark_entry_invalid(struct ringbuffer *ring,
+ struct ringbuffer_entry *entry)
+{
+ struct data_header *header = entry->data.header;
+
+ return header_io(WRITE, &(struct header_io_params) {
+ DATA_HEADER, ringbuffer_repl_log(ring),
+ header, header->pos.header, disk_header_set_invalid });
+}
+
+enum endio_type { HEADER_ENDIO = 0, DATA_ENDIO, NR_ENDIOS };
+static void
+endio(struct ringbuffer_entry *entry,
+ enum endio_type type, unsigned long error)
+{
+ *(type == DATA_ENDIO ? &entry->data.error.data :
+ &entry->data.error.header) = error;
+
+ if (atomic_dec_and_test(&entry->endios))
+ /*
+ * Endio processing requires disk writes to advance the log
+ * tail pointer. So, we need to defer this to process context.
+ * The endios are processed from the l->lists.entry.io list,
+ * and the entry is already on that list.
+ */
+ wake_do_log(ringbuffer_repl_log(entry->ring));
+ else
+ BUG_ON(atomic_read(&entry->endios) < 0);
+}
+
+/* Endio routine for data header io. */
+static void
+header_endio(unsigned long error, void *context)
+{
+ endio(context, HEADER_ENDIO, error);
+}
+
+/* Endio routine for data io (ie. the bio data written for an entry). */
+static void
+data_endio(unsigned long error, void *context)
+{
+ endio(context, DATA_ENDIO, error);
+}
+
+/*
+ * Place the data contained in bio asynchronously
+ * into the replog's ring buffer.
+ *
+ * This can be void, because any allocation failure is fatal and any
+ * IO errors will be reported asynchronously via dm_io() callbacks.
+ */
+static void
+ringbuffer_write_entry(struct repl_log *l, struct bio *bio)
+{
+ int i;
+ struct ringbuffer *ring = &l->ringbuffer;
+ /*
+ * ringbuffer_alloc_entry returns an entry,
+ * including an initialized data_header.
+ */
+ struct ringbuffer_entry *entry = ringbuffer_alloc_entry(ring, bio);
+ struct data_header_disk *disk_header = alloc_data_header_disk(ring);
+ struct data_header *header = entry->data.header;
+ struct dev_io_params dio[] = {
+ { /* Data IO specs. */
+ &l->params.dev, header->pos.data, bio->bi_size,
+ .mem = { DM_IO_BVEC, { .bvec = bio_iovec(bio) },
+ bio_offset(bio) },
+ .notify = { data_endio, entry }
+ },
+ { /* Header IO specs. */
+ &l->params.dev, header->pos.header, DATA_HEADER_DISK_SIZE,
+ .mem = { DM_IO_KMEM, { .addr = disk_header }, 0 },
+ .notify = { header_endio, entry }
+ },
+ };
+
+ DMDEBUG_LIMIT("in %s %u", __func__, jiffies_to_msecs(jiffies));
+ BUG_ON(!disk_header);
+ entry->data.disk_header = disk_header;
+ data_header_to_disk(BITMAP_ELEMS(l), disk_header, header);
+
+ /* Take ringbuffer IO reference out vs. slink0. */
+ ss_io_get(l->slink0->caller);
+
+ /* Add to ordered list of active entries. */
+ list_add_tail(E_WRITE_OR_COPY_LIST(entry), L_ENTRY_RING_WRITE_LIST(l));
+
+ DMDEBUG_LIMIT("%s writing header to offset=%llu and bio for "
+ "sector=%llu to sector=%llu/size=%llu", __func__,
+ (unsigned long long) entry->data.header->pos.header,
+ (unsigned long long) bio_begin(bio),
+ (unsigned long long) entry->data.header->pos.data,
+ (unsigned long long) to_sector(dio[1].size));
+
+ /*
+ * Submit the writes.
+ *
+ * 1 I/O count for header + 1 for data
+ */
+ i = ARRAY_SIZE(dio);
+ atomic_set(&entry->endios, i);
+ while (i--)
+ BUG_ON(dev_io(WRITE, ring, dio + i));
+
+ DMDEBUG_LIMIT("out %s %u", __func__, jiffies_to_msecs(jiffies));
+}
+
+/* Endio routine for bio data reads of off the ring buffer. */
+static void
+read_bio_vec_endio(unsigned long error, void *context)
+{
+ struct ringbuffer_entry *entry = context;
+ struct ringbuffer *ring = entry->ring;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+
+ atomic_dec(&entry->endios);
+ BUG_ON(!entry->bios.read);
+ bio_endio(entry->bios.read, error ? -EIO : 0);
+ entry->bios.read = NULL;
+ entry_put(entry);
+ wake_do_log(l);
+
+ /* Release IO reference on slink0. */
+ ss_io_put(l->slink0->caller);
+}
+
+/* Read bio data of off the ring buffer. */
+static void
+ringbuffer_read_bio_vec(struct repl_log *l,
+ struct ringbuffer_entry *entry, sector_t offset,
+ struct bio *bio)
+{
+ /* Data IO specs. */
+ struct dev_io_params dio = {
+ &l->params.dev,
+ entry->data.header->pos.data + offset, bio->bi_size,
+ .mem = { DM_IO_BVEC, { .bvec = bio_iovec(bio) },
+ bio_offset(bio) },
+ .notify = { read_bio_vec_endio, entry }
+ };
+
+ DMDEBUG_LIMIT("in %s %u", __func__, jiffies_to_msecs(jiffies));
+ _BUG_ON_PTR(entry);
+ entry_get(entry);
+ atomic_inc(&entry->endios);
+
+ /* Take IO reference out vs. slink0. */
+ ss_io_get(l->slink0->caller);
+
+ DMDEBUG("%s reading bio data bio for sector=%llu/size=%llu",
+ __func__, (unsigned long long) bio_begin(bio),
+ (unsigned long long) to_sector(dio.size));
+
+ /*
+ * Submit the read.
+ */
+ BUG_ON(dev_io(READ, &l->ringbuffer, &dio));
+ DMDEBUG_LIMIT("out %s %u", __func__, jiffies_to_msecs(jiffies));
+}
+
+/*
+ * Advances the ring buffer head pointer, updating the in-core data
+ * and writing it to the backing store device, but only if there are
+ * inactive entries (ie. those with copies to all slinks) at the head.
+ *
+ * Returns -ve errno on failure, otherwise the number of entries freed.
+ */
+static int
+ringbuffer_advance_head(const char *caller, struct ringbuffer *ring)
+{
+ int r;
+ unsigned entries_freed = 0;
+ sector_t sectors_freed = 0;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+ struct ringbuffer_entry *entry, *entry_last = NULL, *n;
+
+ /* Count any freeable entries and remeber last one. */
+ list_for_each_entry(entry, L_ENTRY_ORDERED_LIST(l),
+ lists.l[E_ORDERED]) {
+ /* Can't advance past dirty entry. */
+ if (entry_busy(l, ENTRY_SLINKS(entry)) ||
+ atomic_read(&entry->endios))
+ break;
+
+ BUG_ON(entry_endios_pending(entry));
+ entry_last = entry;
+ entries_freed++;
+ }
+
+ /* No entries to free. */
+ if (!entries_freed)
+ return 0;
+
+ BUG_ON(!entry_last);
+
+ /* Need safe version, because ringbuffer_free_entry removes entry. */
+ list_for_each_entry_safe(entry, n, L_ENTRY_ORDERED_LIST(l),
+ lists.l[E_ORDERED]) {
+ struct data_header *header = entry->data.header;
+
+ BUG_ON(entry_busy(l, ENTRY_SLINKS(entry)) ||
+ entry_endios_pending(entry) ||
+ atomic_read(&entry->endios));
+
+ /*
+ * If the entry wrapped around between the header and
+ * the data or if the next entry wraps, free the
+ * unused sectors at the end of the device.
+ */
+ mutex_lock(&ring->mutex);
+ sectors_freed += roundup_sectors(header->pos.data_sectors)
+ + sectors_skipped(ring, header);
+ if (likely(ring->head != ring->tail))
+ ring->head = next_start_adjust(ring, header);
+ BUG_ON(ring->head >= ring->end);
+ mutex_unlock(&ring->mutex);
+
+ /* Don't access entry after this call! */
+ ringbuffer_free_entry(entry);
+
+ if (entry == entry_last)
+ break;
+ }
+
+ DMDEBUG_LIMIT("%s (%s) advancing ring buffer head for %u "
+ "entries to %llu",
+ __func__, caller, entries_freed,
+ (unsigned long long) ring->head);
+
+ /* Update ring buffer pointers in buffer header. */
+ r = buffer_header_io(WRITE, l);
+ if (likely(!r)) {
+ /* Buffer header written... */
+ mutex_lock(&ring->mutex);
+ ring->free += sectors_freed;
+ mutex_unlock(&ring->mutex);
+ }
+
+ /* Inform caller, that we're willing to receive more I/Os. */
+ ClearRingBlocked(ring);
+ ClearRingBufferFull(ring);
+ notify_caller(l, WRITE, 0);
+ if (unlikely(r < 0))
+ ringbuffer_error(RING_BUFFER_HEAD_ERROR, ring, r);
+
+ return r ? r : entries_freed;
+}
+
+/*
+ * Advances the tail pointer after a successful
+ * write of an entry to the log.
+ */
+static int
+ringbuffer_advance_tail(struct ringbuffer_entry *entry)
+{
+ int r;
+ sector_t new_tail, old_tail;
+ struct ringbuffer *ring = entry->ring;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+ struct data_header *header = entry->data.header;
+
+/*
+ if (unlikely(ring->tail != header->pos.header)) {
+ DMERR("ring->tail %llu header->pos.header %llu",
+ (unsigned long long) ring->tail,
+ (unsigned long long) header->pos.header);
+ BUG();
+ }
+*/
+
+ mutex_lock(&ring->mutex);
+ old_tail = ring->tail;
+ /* Should we let this get out of sync? */
+ new_tail = ring->tail = next_start_adjust(ring, header);
+ BUG_ON(ring->tail >= ring->end);
+ mutex_unlock(&ring->mutex);
+
+ DMDEBUG_LIMIT("%s header->pos.header=%llu header->pos.data=%llu "
+ "ring->tail=%llu; "
+ "advancing ring tail pointer to %llu",
+ __func__,
+ (unsigned long long) header->pos.header,
+ (unsigned long long) header->pos.data,
+ (unsigned long long) ring->tail,
+ (unsigned long long) ring->tail);
+
+ r = buffer_header_io(WRITE, l);
+ if (unlikely(r < 0)) {
+ /* Return the I/O size to ring->free. */
+ mutex_lock(&ring->mutex);
+ /* Make sure it wasn't changed. */
+ BUG_ON(ring->tail != new_tail);
+ ring->tail = old_tail;
+ mutex_unlock(&ring->mutex);
+
+ ringbuffer_error(RING_BUFFER_TAIL_ERROR, ring, r);
+ }
+
+ return r;
+}
+
+/* Open type <-> name mapping. */
+static const struct dm_str_descr open_types[] = {
+ { OT_AUTO, "auto" },
+ { OT_OPEN, "open" },
+ { OT_CREATE, "create" },
+};
+
+/* Get slink policy flags. */
+static inline int
+_open_type(const char *name)
+{
+ return dm_descr_type(open_types, ARRAY_SIZE(open_types), name);
+}
+
+/* Get slink policy name. */
+static inline const char *
+_open_str(const int type)
+{
+ return dm_descr_name(open_types, ARRAY_SIZE(open_types), type);
+}
+
+/*
+ * Amount of free sectors in ring buffer. This function does not take
+ * into account unused sectors at the end of the log device.
+ */
+static sector_t
+ring_free(struct ringbuffer *ring)
+{
+ if (unlikely(ring->head == ring->next_avail))
+ return ring->end - ring->start;
+ else
+ return ring->head > ring->tail ?
+ ring->head - ring->tail :
+ (ring->head - ring->start) + (ring->end - ring->tail);
+}
+
+static struct log_header *
+alloc_log_header(struct repl_log *l)
+{
+ struct log_header *log_header =
+ kzalloc(sizeof(*log_header), GFP_KERNEL);
+
+ if (log_header)
+ l->header.log = log_header;
+
+ return log_header;
+}
+
+static void free_log_header(struct log_header *log_header,
+ struct ringbuffer *ring)
+{
+ kfree(log_header);
+}
+
+/* Create a new dirty log. */
+static int
+log_create(struct repl_log *l)
+{
+ int r;
+ struct log_header *log_header = l->header.log;
+ struct repl_dev *dev = &l->params.dev;
+ struct repl_params *params = &l->params;
+ struct ringbuffer *ring = &l->ringbuffer;
+
+ DMINFO("%s: creating new log", __func__);
+ _BUG_ON_PTR(log_header);
+
+ /* First, create the in-memory representation */
+ log_header->version.major = DM_REPL_LOG_MAJOR;
+ log_header->version.minor = DM_REPL_LOG_MINOR;
+ log_header->version.subminor = DM_REPL_LOG_MICRO;
+ log_header->size = params->dev.size;
+ log_header->buffer_header = dev->start + HEADER_SECTORS;
+
+ /* Write log header to device. */
+ r = log_header_io(WRITE, l);
+ if (unlikely(r < 0)) {
+ free_log_header(log_header, ring);
+ l->header.log = NULL;
+ return r;
+ }
+
+ /*
+ * Initialize the ring buffer.
+ *
+ * Start is behind the buffer header which follows the log header.
+ */
+ ring->start = params->dev.start;
+ ring->end = ring->start + params->dev.size;
+ ring->start += 2 * HEADER_SECTORS;
+ ring->head = ring->tail = ring->next_avail = ring->start;
+ ring->free = ring_free(ring);
+
+ DMDEBUG("%s start=%llu end=%llu free=%llu", __func__,
+ (unsigned long long) ring->start,
+ (unsigned long long) ring->end,
+ (unsigned long long) ring->free);
+
+ r = buffer_header_io(WRITE, l);
+ if (unlikely(r < 0)) {
+ free_log_header(log_header, ring);
+ l->header.log = NULL;
+ return r;
+ }
+
+ return 0;
+}
+
+/* Allocate a log_header and read header in from disk. */
+static int
+log_read(struct repl_log *l)
+{
+ int r;
+ struct log_header *log_header = l->header.log;
+ struct repl_dev *dev;
+ struct ringbuffer *ring;
+ char buf[BDEVNAME_SIZE];
+
+ _BUG_ON_PTR(log_header);
+ r = log_header_io(READ, l);
+ if (unlikely(r < 0))
+ return r;
+
+ format_dev_t(buf, l->params.dev.dm_dev->bdev->bd_dev);
+
+ /* Make sure that we can handle this version of the log. */
+ if (memcmp(&log_header->version, &my_version, sizeof(my_version)))
+ DMINFO("Found valid log header on %s", buf);
+ else
+ DM_EINVAL("On-disk version (%d.%d.%d) is "
+ "not supported by this module.",
+ log_header->version.major, log_header->version.minor,
+ log_header->version.subminor);
+
+ /*
+ * Read in the buffer_header_disk
+ */
+ r = buffer_header_io(READ, l);
+ if (unlikely(r < 0))
+ return r;
+
+ dev = &l->params.dev;
+ ring = &l->ringbuffer;
+
+ /*
+ * We'll go with the size in the log header and
+ * adjust it in the worker thread when possible.
+ */
+ ring->end = dev->start + log_header->size;
+ ring->next_avail = ring->tail;
+
+ /*
+ * The following call to ring_free is incorrect as the free
+ * space in the ring has to take into account the potential
+ * for unused sectors at the end of the device. However, once
+ * do_log_init is called, any discrepencies are fixed there.
+ */
+ ring->free = ring_free(ring);
+ return 0;
+}
+
+/*
+ * Open and read/initialize a replicator log backing store device.
+ *
+ * Must be called with dm_io client set up, because we dm_io to the device.
+ */
+/* Try to read an existing log or create a new one. */
+static int
+log_init(struct repl_log *l)
+{
+ int r;
+ struct repl_params *p = &l->params;
+ struct log_header *log_header = alloc_log_header(l);
+
+ BUG_ON(!log_header);
+
+ /* Read the log header in from disk. */
+ r = log_read(l);
+ switch (r) {
+ case 0:
+ /* Sucessfully read in the log. */
+ if (p->open_type == OT_CREATE)
+ DMERR("OT_CREATE requested: "
+ "initializing existing log!");
+ else
+ p->dev.size = l->header.log->size;
+
+ break;
+ case -EINVAL:
+ /*
+ * Most likely this is the initial create of the log.
+ * But, if this is an open, return failure.
+ */
+ if (p->open_type == OT_OPEN)
+ DMWARN("Can't create new replog on open!");
+ else
+ /* Try to create a new log. */
+ r = log_create(l);
+
+ break;
+ case -EIO:
+ DMERR("log_read IO error!");
+ break;
+ default:
+ DMERR("log_read failed with %d?", r);
+ }
+
+ return r;
+}
+
+/* Find a replog on the global list checking for bdev and start offset. */
+static struct repl_log *
+replog_find(dev_t dev, sector_t dev_start)
+{
+ struct repl_log *replog;
+
+ list_for_each_entry(replog, &replog_list, lists.l[L_REPLOG]) {
+ if (replog->params.dev.dm_dev->bdev->bd_dev == dev)
+ return likely(replog->params.dev.start == dev_start) ?
+ replog : ERR_PTR(-EINVAL);
+ }
+
+ return ERR_PTR(-ENOENT);
+}
+
+/* Clear all allocated slab objects in case of busys teardown. */
+static void
+ringbuffer_free_entries(struct ringbuffer *ring)
+{
+ struct ringbuffer_entry *entry, *n;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+
+ list_for_each_entry_safe(entry, n, L_ENTRY_ORDERED_LIST(l),
+ lists.l[E_ORDERED]) {
+ if (atomic_read(&entry->ref))
+ sector_range_clear_busy(entry);
+
+ ringbuffer_free_entry(entry);
+ }
+}
+
+static void
+replog_release(struct kref *ref)
+{
+ struct repl_log *l = container_of(ref, struct repl_log, ref);
+
+ BUG_ON(!list_empty(L_REPLOG_LIST(l)));
+ kfree(l);
+}
+
+/* Destroy replication log. */
+static void
+replog_destroy(struct repl_log *l)
+{
+ _BUG_ON_PTR(l);
+
+ if (l->io.wq)
+ destroy_workqueue(l->io.wq);
+
+ free_log_header(l->header.log, &l->ringbuffer);
+ ringbuffer_free_entries(&l->ringbuffer);
+ ringbuffer_exit(&l->ringbuffer);
+ kfree(l->io.buffer_header_disk);
+
+ if (l->io.io_client)
+ dm_io_client_destroy(l->io.io_client);
+}
+
+/* Release a reference on a replog freeing its resources on last drop. */
+static int
+replog_put(struct dm_repl_log *log, struct dm_target *ti)
+{
+ struct repl_log *l;
+
+ _SET_AND_BUG_ON_L(l, log);
+ dm_put_device(ti, l->params.dev.dm_dev);
+ return kref_put(&l->ref, replog_release);
+}
+
+/* Return ringbuffer log device size. */
+static sector_t
+replog_dev_size(struct dm_dev *dm_dev, sector_t size_wanted)
+{
+ sector_t dev_size = i_size_read(dm_dev->bdev->bd_inode) >> SECTOR_SHIFT;
+
+ return (!dev_size || size_wanted > dev_size) ? 0 : dev_size;
+}
+
+/* Get a reference on a replicator log. */
+static void do_log(struct work_struct *ws);
+static struct repl_log *
+replog_get(struct dm_repl_log *log, struct dm_target *ti,
+ const char *path, struct repl_params *params)
+{
+ int i, r;
+ dev_t dev;
+ sector_t dev_size;
+ struct dm_dev *dm_dev;
+ char buf[BDEVNAME_SIZE];
+ struct repl_log *l;
+ struct dm_io_client *io_client;
+
+ /* Get device with major:minor or device path. */
+ r = dm_get_device(ti, path, params->dev.start, params->dev.size,
+ FMODE_WRITE, &dm_dev);
+ if (r) {
+ DMERR("Failed to open replicator log device \"%s\" [%d]",
+ path, r);
+ return ERR_PTR(r);
+ }
+
+ dev = dm_dev->bdev->bd_dev;
+ dev_size = replog_dev_size(dm_dev, params->dev.size);
+ if (!dev_size)
+ return ERR_PTR(-EINVAL);
+
+ /* Check if we already have a handle to this device. */
+ mutex_lock(&list_mutex);
+ l = replog_find(dev, params->dev.start);
+ if (IS_ERR(l)) {
+ mutex_unlock(&list_mutex);
+
+ if (unlikely(l == ERR_PTR(-EINVAL))) {
+ DMERR("Device open with different start offset!");
+ dm_put_device(ti, dm_dev);
+ return l;
+ }
+ } else {
+ /* Cannot create if there is an open reference. */
+ if (params->open_type == OT_CREATE) {
+ mutex_unlock(&list_mutex);
+ DMERR("OT_CREATE requested, but existing log found!");
+ dm_put_device(ti, dm_dev);
+ return ERR_PTR(-EPERM);
+ }
+
+ /* Take reference on replication log out. */
+ kref_get(&l->ref);
+ mutex_unlock(&list_mutex);
+
+ DMINFO("Found existing replog=%s", format_dev_t(buf, dev));
+
+ /* Found one, return it. */
+ log->context = l;
+ return l;
+ }
+
+ /*
+ * There is no open log, so time to look for one on disk.
+ */
+ l = kzalloc(sizeof(*l), GFP_KERNEL);
+ if (unlikely(!l)) {
+ DMERR("failed to allocate replicator log context");
+ dm_put_device(ti, dm_dev);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ /* Preserve constructor parameters. */
+ l->params = *params;
+ l->params.dev.dm_dev = dm_dev;
+
+ log->context = l;
+ l->replog = log;
+
+ /* Init basic members. */
+ rwlock_init(&l->lists.lock);
+ rwlock_init(&l->lists.slinks.lock);
+ INIT_LIST_HEAD(&l->lists.slinks.list);
+
+ i = L_NR_LISTS;
+ while (i--)
+ INIT_LIST_HEAD(l->lists.l + i);
+
+ spin_lock_init(&l->io.lock);
+ bio_list_init(&l->io.in);
+
+ /* Take first reference out. */
+ kref_init(&l->ref);
+
+ /* Initialize ring buffer. */
+ r = ringbuffer_init(&l->ringbuffer);
+ if (unlikely(r < 0)) {
+ DMERR("failed to initialize ring buffer %d", r);
+ goto bad;
+ }
+
+ /* Preallocate to avoid stalling on OOM. */
+ l->io.buffer_header_disk =
+ kzalloc(dm_round_up(sizeof(l->io.buffer_header_disk),
+ to_bytes(1)), GFP_KERNEL);
+ if (unlikely(!l->io.buffer_header_disk)) {
+ DMERR("failed to allocate ring buffer disk header");
+ r = -ENOMEM;
+ goto bad;
+ }
+
+ /*
+ * ringbuffer_io will only be called with I/O sizes of ti->split_io
+ * or fewer bytes, which are boundary checked too.
+ *
+ * The io_client needs to be setup before we can call log_init below.
+ */
+ io_client = dm_io_client_create(DEFAULT_BIOS * (1 + BIO_MAX_PAGES));
+ if (unlikely(IS_ERR(io_client))) {
+ DMERR("dm_io_client_create failed!");
+ r = PTR_ERR(io_client);
+ goto bad;
+ } else
+ l->io.io_client = io_client;
+
+ /* Create one worker per replog. */
+ l->io.wq = create_singlethread_workqueue(DAEMON);
+ if (unlikely(!l->io.wq)) {
+ DMERR("failed to create workqueue");
+ r = -ENOMEM;
+ goto bad;
+ } else
+ INIT_WORK(&l->io.ws, do_log);
+
+ /* Try to read an existing log or create a new one. */
+ r = log_init(l);
+ if (unlikely(r < 0))
+ goto bad;
+
+ stats_init(l);
+ ClearLogDevelStats(l);
+
+ /* Start out suspended, dm core will resume us. */
+ SetRingSuspended(&l->ringbuffer);
+ SetRingBlocked(&l->ringbuffer);
+
+ /* Link the new replog into the global list */
+ mutex_lock(&list_mutex);
+ list_add_tail(L_REPLOG_LIST(l), &replog_list);
+ mutex_unlock(&list_mutex);
+
+ return l;
+
+bad:
+ replog_destroy(l);
+ return ERR_PTR(r);
+}
+
+/* Account and entry for fallbehind and put on copy list. */
+static void
+entry_account_and_copy(struct ringbuffer_entry *entry)
+{
+ unsigned long slink_nr;
+ struct repl_log *l;
+
+ _BUG_ON_PTR(entry);
+ l = ringbuffer_repl_log(entry->ring);
+ _BUG_ON_PTR(l);
+
+ /* If there's no outstanding copies for this entry -> bail out. */
+ if (!entry_busy(l, ENTRY_SLINKS(entry)))
+ return;
+
+ _BUG_ON_PTR(entry->ring);
+
+ /* Account for fallbehind. */
+ for_each_bit(slink_nr, ENTRY_SLINKS(entry), l->slink.max) {
+ struct dm_repl_slink *slink = slink_find(l, slink_nr);
+
+ if (!IS_ERR(slink))
+ slink_fallbehind_inc(slink, entry);
+ }
+
+ /*
+ * Initiate copies across all SLINKS by moving to
+ * copy list in order. Because we are already processing
+ * do_log before do_slink_ios(), we need not call wake_do_log.
+ */
+ list_move_tail(E_WRITE_OR_COPY_LIST(entry), L_SLINK_COPY_LIST(l));
+}
+
+/* Adjust the log size. */
+static void
+do_log_resize(struct repl_log *l)
+{
+ int r = 0;
+ sector_t size_cur = l->header.log->size,
+ size_dev = l->params.dev.size;
+
+ /* If size change requested, adjust when possible. */
+ if (size_cur != size_dev) {
+ int write = 0;
+ int grow = size_dev > size_cur;
+ struct ringbuffer *ring = &l->ringbuffer;
+
+ mutex_lock(&ring->mutex);
+
+ /* Ringbuffer empty easy case. */
+ r = ringbuffer_empty_nolock(ring);
+ if (r) {
+ ring->head = ring->tail = \
+ ring->next_avail = ring->start;
+ write = true;
+ /* Ringbuffer grow easy case. */
+ /* FIXME: check for device size valid! */
+ } else if (grow) {
+ write = true;
+ /* Ringbuffer shrink case. */
+ } else if (ring->head < ring->tail &&
+ max(ring->tail, ring->next_avail) < size_dev)
+ write = true;
+
+ if (write) {
+ ring->end = l->header.log->size = size_dev;
+ ring->free = ring_free(ring);
+ mutex_unlock(&ring->mutex);
+
+ r = log_header_io(WRITE, l);
+ if (r)
+ DMERR("failed to write log header "
+ "while resizing!");
+ else
+ DMINFO("%sing ringbuffer to %llu sectors",
+ grow ? "grow" : "shrink",
+ (unsigned long long) size_dev);
+ } else {
+ mutex_unlock(&ring->mutex);
+ r = 0;
+ }
+
+ ClearLogResize(l);
+ }
+}
+
+/*
+ * Initialize logs incore metadata.
+ */
+static void
+do_log_init(struct repl_log *l)
+{
+ int entries = 0, r;
+ sector_t sector;
+ struct ringbuffer *ring = &l->ringbuffer;
+ struct ringbuffer_entry *entry;
+
+ /* NOOP in case we're initialized already. */
+ if (TestSetLogInitialized(l))
+ return;
+
+ DMDEBUG("%s ring->head=%llu ring->tail=%llu",
+ __func__,
+ (unsigned long long) ring->head,
+ (unsigned long long) ring->tail);
+
+ /* Nothing to do if the log is empty */
+ if (ringbuffer_empty(ring))
+ goto out;
+
+ /*
+ * Start at head and walk to tail, queuing I/O to slinks.
+ */
+ for (sector = ring->head; sector != ring->tail;) {
+ int r;
+ struct data_header *header;
+
+ entry = ringbuffer_alloc_entry(ring, NULL); /* No bio alloc. */
+ header = entry->data.header;
+ r = data_header_io(READ, l, header, sector);
+ if (unlikely(r < 0)) {
+ /*
+ * FIXME: as written, this is not recoverable.
+ * All ios have to be errored because
+ * of RingBufferError().
+ */
+ ringbuffer_error(RING_BUFFER_HEADER_ERROR, ring,
+ PTR_ERR(entry));
+ ringbuffer_free_entry(entry);
+ break;
+ } else {
+ /* Set synchronous I/O policy mask. */
+ set_sync_mask(l, entry);
+
+ /* Adjust ring->free for any skipped sectors. */
+ ring->free -= sectors_skipped(ring, header);
+
+ /*
+ * Mark sector range busy in case the
+ * entry hasn't been copied to slink0 yet.
+ */
+ if (slink_test_bit(0, ENTRY_SLINKS(entry)))
+ sector_range_mark_busy(entry);
+
+ /*
+ * Account entry for fallbehind and
+ * put on slink copy list if needed.
+ */
+ entry_account_and_copy(entry);
+
+ /* Advance past this entry. */
+ sector = unlikely(next_entry_wraps(header)) ?
+ ring->start : next_start(header);
+ entries++;
+ }
+ }
+
+ DMINFO("found %d entries in the log", entries);
+
+ /* Advance head past any already copied entries. */
+ r = ringbuffer_advance_head(__func__, ring);
+ if (r >= 0)
+ DMINFO("%d entries freed", r);
+ else
+ DMERR_LIMIT("Error %d advancing ring buffer head!", r);
+
+out:
+ ClearRingBlocked(ring);
+ notify_caller(l, READ, 0);
+}
+
+/*
+ * Conditionally endio a bio, when no copies on sync slinks are pending.
+ *
+ * In case an error on site link 0 occured, the bio will be errored!
+ */
+/*
+ * FIXME: in case of no synchronous site links, the entry hasn't hit
+ * the local device yet, so a potential io error on it ain't
+ * available while endio processing the bio.
+ */
+static void
+entry_nosync_endio(struct ringbuffer_entry *entry)
+{
+ struct bio *bio = entry->bios.write;
+
+ /* If all sync slinks processed (if any). */
+ if (bio && !entry_busy(ringbuffer_repl_log(entry->ring),
+ ENTRY_SYNC(entry))) {
+ DMDEBUG_LIMIT("Calling bio_endio with %u, bi_endio %p",
+ entry->data.header->region.size, bio->bi_end_io);
+
+ /* Only error in case of site link 0 errors. */
+ bio_endio(bio,
+ slink_test_bit(0, ENTRY_ERROR(entry)) ? -EIO : 0);
+ entry->bios.write = NULL;
+ }
+}
+
+/*
+ * Error endio the entries bio, mark the ring
+ * buffer entry invalid and advance the tail.
+ */
+static void
+entry_endio_invalid(struct repl_log *l, struct ringbuffer_entry *entry)
+{
+ int r;
+
+ DMDEBUG_LIMIT("entry %p header_err %lu, data_err %lu", entry,
+ entry->data.error.header, entry->data.error.data);
+ BUG_ON(!entry->bios.write);
+ bio_endio(entry->bios.write, -EIO);
+
+ /* Mark the header as invalid so it is not queued for slink copies. */
+ r = ringbuffer_mark_entry_invalid(&l->ringbuffer, entry);
+ if (unlikely(r < 0)) {
+ /* FIXME: XXX
+ * Take the device offline?
+ */
+ DMERR("%s: I/O to sector %llu of log device "
+ "failed, and failed to mark header "
+ "invalid. Taking device off-line.",
+ __func__,
+ (unsigned long long)
+ entry->data.header->region.sector);
+ }
+
+ ringbuffer_free_entry(entry);
+}
+
+static inline int
+cc_error_read(struct slink_copy_context *cc)
+{
+ return cc->error[ERR_DISK].read ||
+ cc->error[ERR_RAM].read;
+}
+
+static inline int
+cc_error_write(struct slink_copy_context *cc)
+{
+ return cc->error[ERR_DISK].write ||
+ cc->error[ERR_RAM].write;
+}
+
+static inline int
+cc_error(struct slink_copy_context *cc)
+{
+ return cc_error_read(cc) ||
+ cc_error_write(cc);
+}
+
+/*
+ * Set state of slink_copy_context to completion.
+ *
+ * slink_copy_conmtext is the object describing a *single* copy
+ * of a particular ringbuffer entry to *one* site link.
+ *
+ * Called with list lock held.
+ */
+static void
+slink_copy_complete(struct slink_copy_context *cc)
+{
+ int slink_nr;
+ struct dm_repl_slink *slink = cc->slink;
+ struct ringbuffer_entry *entry = cc->entry;
+ struct repl_log *l = ringbuffer_repl_log(entry->ring);
+
+ _BUG_ON_PTR(slink);
+ _BUG_ON_PTR(slink->caller);
+ _BUG_ON_PTR(entry);
+ _BUG_ON_PTR(l);
+ slink_nr = slink->ops->slink_number(slink);
+ _BUG_ON_SLINK_NR(l, slink_nr);
+
+ /* The entry is no longer under I/O accross this slink. */
+ slink_clear_bit(slink_nr, ENTRY_IOS(entry));
+
+ /* The slink is no longer under I/O. */
+ slink_clear_bit(slink_nr, LOG_SLINKS_IO(l));
+
+ /* Update the I/O threshold counters */
+ slink_fallbehind_dec(slink, entry);
+
+ DMDEBUG_LIMIT("processing I/O completion for slink%d", slink_nr);
+
+ if (unlikely(cc_error(cc)) &&
+ slink_test_bit(slink_nr, LOG_SLINKS(l))) {
+ slink_set_bit(slink_nr, ENTRY_ERROR(entry));
+ DMERR_LIMIT("copy on slink%d failed", slink_nr);
+ } else {
+ /* Flag entry copied to slink_nr. */
+ slink_clear_bit(slink_nr, ENTRY_SLINKS(entry));
+
+ /* Reset any sync copy request on entry to slink_nr. */
+ slink_clear_bit(slink_nr, ENTRY_SYNC(entry));
+ }
+
+ free_copy_context(cc, entry->ring);
+
+ /* Release slink state reference after completion. */
+ ss_io_put(slink->caller);
+}
+
+/* Check for entry with endios pending at ring buffer head. */
+static int
+ringbuffer_head_busy(struct repl_log *l)
+{
+ int r;
+ struct ringbuffer_entry *entry;
+
+ mutex_lock(&l->ringbuffer.mutex);
+
+ /*
+ * This shouldn't happen. Presumably this function is called
+ * when the ring buffer is overflowing, so you would expect
+ * at least one entry on the list!
+ */
+ if (unlikely(list_empty(L_ENTRY_ORDERED_LIST(l))))
+ goto out_unlock;
+
+ /* The first entry on this list is the ring head. */
+ entry = list_first_entry(L_ENTRY_ORDERED_LIST(l),
+ struct ringbuffer_entry,
+ lists.l[E_ORDERED]);
+ r = entry_endios_pending(entry);
+ mutex_unlock(&l->ringbuffer.mutex);
+ return r;
+
+out_unlock:
+ mutex_unlock(&l->ringbuffer.mutex);
+ DMERR_LIMIT("%s called with an empty ring!", __func__);
+ return 0;
+}
+
+/*
+ * Find the first ring buffer entry with outstanding copies
+ * and record each slink that hasn't completed the copy I/O.
+ */
+static int
+find_slow_slinks(struct repl_log *l, uint64_t *slow_slinks)
+{
+ int r = 0;
+ struct ringbuffer_entry *entry;
+
+ DMDEBUG("%s", __func__);
+ /* Needed for E_COPY_CONTEXT_LIST() access. */
+ list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ int slink_nr;
+ struct slink_copy_context *cc;
+
+ /*
+ * There may or may not be slink copy contexts hanging
+ * off of the entry. If there aren't any, it means the
+ * copy has already completed.
+ */
+ list_for_each_entry(cc, E_COPY_CONTEXT_LIST(entry), list) {
+ struct dm_repl_slink *slink = cc->slink;
+
+ slink_nr = slink->ops->slink_number(slink);
+ _BUG_ON_SLINK_NR(l, slink_nr);
+ slink_set_bit(slink_nr, slow_slinks);
+ r = 1;
+ break;
+ }
+
+ }
+
+ if (r) {
+ /*
+ * Check to see if all slinks are slow! slink0 should
+ * not be slow, one would hope! But, we need to deal
+ * with that case.
+ */
+ if (slink_test_bit(0, slow_slinks)) {
+ struct slink_state *ss;
+
+ _BUG_ON_PTR(l->slink0);
+ ss = l->slink0->caller;
+ _BUG_ON_PTR(ss);
+
+ /*
+ * If slink0 is slow, there is
+ * obviously some other problem!
+ */
+ DMWARN("%s: slink0 copy taking a long time "
+ "(%u ms)", __func__,
+ jiffies_to_msecs(jiffies) -
+ jiffies_to_msecs(ss->fb.head_jiffies));
+ r = 0;
+ } else if (!memcmp(slow_slinks, LOG_SLINKS(l),
+ sizeof(LOG_SLINKS(l))))
+ r = 0;
+
+ if (!r)
+ memset(slow_slinks, 0, BITMAP_SIZE(l));
+ }
+
+ return r;
+}
+
+/* Check if entry has ios scheduled on slow slinks. */
+static int
+entry_is_slow(struct ringbuffer_entry *entry, uint64_t *slow_slinks)
+{
+ unsigned long slink_nr;
+
+ for_each_bit(slink_nr, ENTRY_IOS(entry),
+ ringbuffer_repl_log(entry->ring)->slink.max) {
+ if (test_bit(slink_nr, (void *) slow_slinks))
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ * Cancel slink_copies to the slinks specified in the slow_slinks bitmask.
+ *
+ * This function starts at the beginning of the ordered slink copy list
+ * and frees up ring buffer entries which are waiting only for the slow
+ * slinks. This is accomplished by marking the regions under I/O as
+ * dirty in the slink dirty logs and advancing the ring head pointer.
+ * Once a ring buffer entry is encountered that is waiting for more
+ * than just the slinks specified, the function terminates.
+ */
+static void
+repl_log_cancel_copies(struct repl_log *l, uint64_t *slow_slinks)
+{
+ int r;
+ unsigned long slink_nr;
+ struct ringbuffer *ring = &l->ringbuffer;
+ struct ringbuffer_entry *entry;
+ struct dm_repl_slink *slink;
+ struct data_header_region *region;
+ struct slink_copy_context *cc, *n;
+ static uint64_t flush_slinks[BITMAP_ELEMS_MAX],
+ flush_error[BITMAP_ELEMS_MAX],
+ stall_slinks[BITMAP_ELEMS_MAX];
+
+ DMDEBUG("%s", __func__);
+ memset(flush_slinks, 0, BITMAP_SIZE(l));
+ memset(flush_error, 0, BITMAP_SIZE(l));
+ memset(stall_slinks, 0, BITMAP_SIZE(l));
+
+ /* First walk the entry list setting region nosync state. */
+ list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ if (!entry_is_slow(entry, slow_slinks) ||
+ entry_endios_pending(entry))
+ break;
+
+ region = &entry->data.header->region;
+
+ /* Needed for E_COPY_CONTEXT_LIST() access. */
+ read_lock_irq(&l->lists.lock);
+
+ /* Walk the copy context list. */
+ list_for_each_entry_safe(cc, n, E_COPY_CONTEXT_LIST(entry),
+ list) {
+ slink = cc->slink;
+ _BUG_ON_PTR(slink);
+ slink_nr = slink->ops->slink_number(slink);
+ _BUG_ON_SLINK_NR(l, slink_nr);
+
+ /* Stall IO policy set. */
+ if (slink_stall(slink)) {
+ DMINFO_LIMIT("slink=%lu stall", slink_nr);
+ /*
+ * Keep stall policy in bitarray
+ * to avoid policy change race.
+ */
+ slink_set_bit(slink_nr, stall_slinks);
+ l->stats.stall++;
+ continue;
+ }
+
+ r = slink->ops->in_sync(slink,
+ region->dev, region->sector);
+ if (r)
+ slink_set_bit(slink_nr, flush_slinks);
+
+ r = slink->ops->set_sync(slink, region->dev,
+ region->sector, 0);
+ BUG_ON(r);
+ }
+
+ read_unlock_irq(&l->lists.lock);
+ }
+
+ /*
+ * The dirty logs of all devices on this slink must be flushed in
+ * this second step for performance reasons before advancing the
+ * ring head.
+ */
+ for_each_bit(slink_nr, (void *) flush_slinks, l->slink.max) {
+ slink = slink_find(l, slink_nr);
+ r = slink->ops->flush_sync(slink);
+
+ if (unlikely(r)) {
+ /*
+ * What happens when the region is
+ * marked but not flushed? Will we
+ * still get an endio?
+ * This code assumes not. -JEM
+ *
+ * If a region is marked sync, the slink
+ * code won't select it for resync,
+ * Hence we got to keep the buffer entries,
+ * because we can't assume resync is
+ * ever going to happen. -HJM
+ */
+ DMERR_LIMIT("error flushing dirty logs "
+ "on slink=%d",
+ slink->ops->slink_number(slink));
+ slink_set_bit(slink_nr, flush_error);
+ } else {
+ /* Trigger resynchronization on slink. */
+ r = slink->ops->resync(slink, 1);
+ BUG_ON(r);
+ }
+ }
+
+ /* Now release copy contexts, declaring copy completion. */
+ list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ if (!entry_is_slow(entry, slow_slinks) ||
+ entry_endios_pending(entry))
+ break;
+
+ /* Needed for E_COPY_CONTEXT_LIST() access. */
+ write_lock_irq(&l->lists.lock);
+
+ /* Walk the copy context list. */
+ list_for_each_entry_safe(cc, n, E_COPY_CONTEXT_LIST(entry),
+ list) {
+ slink = cc->slink;
+ slink_nr = slink->ops->slink_number(slink);
+
+ /* Stall IO policy set. */
+ if (slink_test_bit(slink_nr, stall_slinks))
+ continue;
+
+ /* Error flushing dirty log, keep entry. */
+ if (unlikely(slink_test_bit(slink_nr, flush_error)))
+ continue;
+
+ BUG_ON(list_empty(&cc->list));
+ list_del_init(&cc->list);
+
+ /* Do not reference cc after this call. */
+ slink_copy_complete(cc);
+ }
+
+ write_unlock_irq(&l->lists.lock);
+ }
+
+ /*
+ * Now advance the head pointer to free up room in the ring buffer.
+ * In case we fail here, we've got both entries in the ring buffer
+ * *and* nosync regions to recover.
+ */
+ ringbuffer_advance_head(__func__, ring);
+}
+
+/*
+ * This function is called to free up some ring buffer space when a
+ * full condition is encountered. The basic idea is to walk through
+ * the list of outstanding copies and see which slinks are slow to
+ * respond. Then, we free up as many of the entries as possible and
+ * advance the ring head.
+ */
+static void
+ring_check_fallback(struct ringbuffer *ring)
+{
+ int r;
+ struct repl_log *l = ringbuffer_repl_log(ring);
+ static uint64_t slow_slinks[BITMAP_ELEMS_MAX];
+
+ DMDEBUG("%s", __func__);
+ /*
+ * First, check to see if we can simply
+ * free entries at the head of the ring.
+ */
+ r = ringbuffer_advance_head(__func__, ring);
+ if (r > 0) {
+ DMINFO_LIMIT("%s: able to advance head", __func__);
+ return;
+ }
+
+ /*
+ * Check to see if any entries at the head of the ring buffer
+ * are currently queued for completion. If they are, then
+ * don't do anything here; simply allow the I/O completion to
+ * proceed.
+ */
+ r = ringbuffer_head_busy(l);
+ if (r) {
+ DMINFO_LIMIT("%s: endios pending.", __func__);
+ return;
+ }
+
+ /*
+ * Take a look at the first entry in the copy list with outstanding
+ * I/O and figure out which slinks are holding up progress.
+ */
+ memset(slow_slinks, 0, BITMAP_SIZE(l));
+
+ r = find_slow_slinks(l, slow_slinks);
+ if (r) {
+ DMINFO_LIMIT("%s: slow slinks found.", __func__);
+ /*
+ * Now, walk the copy list from the beginning and free
+ * any entry which is awaiting copy completion from the
+ * slow slinks. Once we hit an entry which is awaiting
+ * completion from an slink other than the slow ones, we stop.
+ */
+ repl_log_cancel_copies(l, slow_slinks);
+ } else
+ DMINFO_LIMIT("%s: no slow slinks found.", __func__);
+}
+
+static int
+entry_error(struct ringbuffer_entry *entry)
+{
+ struct entry_data *data = &entry->data;
+
+ if (unlikely(data->error.header ||
+ data->error.data)) {
+ if (data->error.header)
+ ringbuffer_error(RING_BUFFER_HEADER_ERROR,
+ entry->ring, -EIO);
+
+ if (data->error.data)
+ ringbuffer_error(RING_BUFFER_DATA_ERROR,
+ entry->ring, -EIO);
+
+ return -EIO;
+ }
+
+ return 0;
+}
+
+/*
+ * Ring buffer endio processing. The ring buffer tail cannot be
+ * advanced until both the data and data_header portions are written
+ * to the log, AND all of the buffer I/O's preceding this one are in
+ * the log have completed.
+ */
+#define MIN_ENTRIES_INACTIVE 128
+static void
+do_ringbuffer_endios(struct repl_log *l)
+{
+ int r;
+ unsigned count = 0;
+ struct ringbuffer *ring = &l->ringbuffer;
+ struct ringbuffer_entry *entry, *entry_last = NULL, *n;
+
+ DMDEBUG_LIMIT("%s", __func__);
+
+ /*
+ * The l->lists.entry.io list is sorted by on-disk order. The first
+ * entry in the list will correspond to the current ring buffer tail
+ * plus the size of the last valid entry. We process endios in
+ * order so that the tail is not advanced past unfinished entries.
+ */
+
+ list_for_each_entry(entry, L_ENTRY_RING_WRITE_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ if (atomic_read(&entry->endios))
+ break;
+
+ count++;
+ entry_last = entry;
+ }
+
+ /* No inactive entries on list -> bail out. */
+ if (!count)
+ return;
+
+ BUG_ON(!entry_last);
+
+ /* Update the tail pointer once for a list of entries. */
+ DMDEBUG_LIMIT("%s advancing ring buffer tail %u entries",
+ __func__, count);
+ r = ringbuffer_advance_tail(entry_last);
+
+ /* Now check for any errored entries. */
+ list_for_each_entry_safe(entry, n, L_ENTRY_RING_WRITE_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ struct entry_data *data = &entry->data;
+
+ _BUG_ON_PTR(data->disk_header);
+ free_data_header_disk(data->disk_header, ring);
+ data->disk_header = NULL;
+
+ ss_io_put(l->slink0->caller);
+
+ /*
+ * Tail update error before or header/data
+ * ring buffer write error -> error bio.
+ */
+ if (unlikely(r || entry_error(entry)))
+ entry_endio_invalid(l, entry);
+ else {
+ /*
+ * Handle the slink policy for sync vs. async here.
+ *
+ * Synchronous link means, that endio needs to be
+ * reported *after* the slink copy of the entry
+ * succeeded and *not* after the entry got stored
+ * in the ring buffer. -HJM
+ */
+ /* Endio bio in case of no sync slinks. */
+ entry_nosync_endio(entry);
+
+ /*
+ * Account entry for fallbehind
+ * and put on slink copy list.
+ *
+ * WARNING: removes entry from write list!
+ */
+ entry_account_and_copy(entry);
+ }
+
+ if (entry == entry_last)
+ break;
+ }
+
+ /* On ring full, check if we need to fall back to bitmap mode. */
+ if (RingBufferFull(ring))
+ ring_check_fallback(ring);
+
+ /* Wake up any waiters. */
+ wake_up(&ring->flushq);
+}
+
+/*
+ * Work all site link endios (i.e. all slink_copy contexts).
+ */
+static struct slink_copy_context *
+cc_pop(struct repl_log *l)
+{
+ struct slink_copy_context *cc;
+
+ /* Pop copy_context from copy contexts list. */
+ if (list_empty(L_SLINK_ENDIO_LIST(l)))
+ cc = NULL;
+ else {
+ cc = list_first_entry(L_SLINK_ENDIO_LIST(l),
+ struct slink_copy_context, list);
+ list_del(&cc->list);
+ }
+
+ return cc;
+}
+
+static void
+do_slink_endios(struct repl_log *l)
+{
+ int r;
+ LIST_HEAD(slink_endios);
+ struct ringbuffer *ring = &l->ringbuffer;
+ struct ringbuffer_entry *entry = NULL;
+ struct data_header *header;
+
+ DMDEBUG_LIMIT("%s", __func__);
+
+ while (1) {
+ int slink_nr;
+ struct slink_copy_context *cc;
+ struct dm_repl_slink *slink;
+
+ /* Pop copy_context from copy contexts list. */
+ write_lock_irq(&l->lists.lock);
+ cc = cc_pop(l);
+ if (!cc) {
+ write_unlock_irq(&l->lists.lock);
+ break;
+ }
+
+ /* No active copy on endios list! */
+ BUG_ON(atomic_read(&cc->cnt));
+
+ slink = cc->slink;
+ entry = cc->entry;
+
+ /* Do not reference cc after this call. */
+ slink_copy_complete(cc);
+
+ write_unlock_irq(&l->lists.lock);
+
+ _BUG_ON_PTR(slink);
+ _BUG_ON_PTR(slink->ops);
+ _BUG_ON_PTR(entry);
+
+ /*
+ * All reads are serviced from slink0 (for now), so mark
+ * sectors as no longer under I/O once the copy to slink0
+ * is complete.
+ */
+ slink_nr = slink->ops->slink_number(slink);
+ _BUG_ON_SLINK_NR(l, slink_nr);
+ if (!slink_nr)
+ sector_range_clear_busy(entry);
+
+ /* If all synchronous site links processed, endio here. */
+ entry_nosync_endio(entry);
+
+ /*
+ * Update data header on disk to reflect the ENTRY_SLINK
+ * change so that we don't pick up a copy which has
+ * finished again on restart.
+ *
+ * FIXME: this throttles throughput on fast site links.
+ */
+ header = entry->data.header;
+ _BUG_ON_PTR(header);
+ r = data_header_io(WRITE, l, header, header->pos.header);
+ if (unlikely(r < 0)) {
+ DMERR_LIMIT("Writing data header at %llu",
+ (unsigned long long) header->pos.header);
+
+ /* Flag error on all slinks because we can't recover. */
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max)
+ slink_set_bit(slink_nr, ENTRY_ERROR(entry));
+ }
+ }
+
+ /*
+ * If all slinks are up-to-date, then we can advance
+ * the ring buffer head pointer and remove the entry
+ * from the slink copy list.
+ */
+ r = ringbuffer_advance_head(__func__, ring);
+ if (r < 0)
+ DMERR_LIMIT("Error %d advancing ring buffer head!", r);
+}
+
+/*
+ * Read a bio (partially) of off log:
+ *
+ * o check if bio's data is completely in the log
+ * -> redirect N reads to the log
+ * (N = 1 for simple cases to N > 1)
+ * o check if bio's data is split between log and LD
+ * -> redirect N parts to the log
+ * -> redirect 1 part to the LD
+ * o if bio'data is on the LD
+ */
+#define DO_INFO1 \
+DMDEBUG_LIMIT("%s overlap for bio_range.start=%llu bio_range.end=%llu " \
+ "entry_range.start=%llu entry_range.end=%llu", __func__, \
+ (unsigned long long) bio_range.start, \
+ (unsigned long long) bio_range.end, \
+ (unsigned long long) entry_range.start, \
+ (unsigned long long) entry_range.end);
+#define DO_INFO2 \
+DMDEBUG_LIMIT("%s NO overlap for bio_range.start=%llu bio_range.end=%llu " \
+ "entry_range.start=%llu entry_range.end=%llu", __func__, \
+ (unsigned long long) bio_range.start, \
+ (unsigned long long) bio_range.end, \
+ (unsigned long long) entry_range.start, \
+ (unsigned long long) entry_range.end);
+static int
+bio_read(struct repl_log *l, struct bio *bio, struct list_head *buckets[2])
+{
+ int r;
+ unsigned i;
+ struct ringbuffer_entry *entry;
+ struct sector_range bio_range = {
+ .start = bio_begin(bio),
+ .end = bio_end(bio),
+ }, entry_range;
+
+ /* Figure overlapping areas. */
+ r = 0;
+ for (i = 0; buckets[i] && i < 2; i++) {
+ /* Find entry from end of bucket. */
+ list_for_each_entry_reverse(entry, buckets[i],
+ lists.l[E_BUSY_HASH]) {
+ entry_range.start = entry->data.header->region.sector;
+ entry_range.end = entry_range.start +
+ round_up_to_sector(entry->data.header->region.size);
+
+ if (ranges_overlap(&bio_range, &entry_range)) {
+ if (bio_range.start >= entry_range.start &&
+ bio_range.end <= entry_range.end) {
+ sector_t off;
+
+ entry->bios.read = bio;
+ DO_INFO1;
+ off = bio_range.start -
+ entry_range.start;
+ ringbuffer_read_bio_vec(l, entry,
+ off, bio);
+ return 0;
+ } else
+ DO_INFO2;
+ } else
+ goto out;
+ }
+ }
+
+ /*
+ * slink->ops->io() will check if region is in sync
+ * and return -EAGAIN in case the I/O needs
+ * to be delayed. Returning -ENODEV etc. is fatal.
+ *
+ * WARNING: bio->bi_bdev changed after return!
+ */
+ /*
+ * Reading of off log:
+ * o check if bio's data is completely in the log
+ * -> redirect N reads to the log
+ * (N = 1 for simple cases to N > 1)
+ * o check if bio's data is split between log and LD
+ * -> redirect N parts to the log
+ * -> redirect 1 part to the LD
+ * o if bio'data is on the LD
+ */
+out:
+ return -EAGAIN;
+}
+#undef DO_INFO1
+#undef DO_INFO2
+
+static int
+ringbuffer_read_bio(struct repl_log *l, struct bio *bio)
+{
+ int r;
+ struct ringbuffer *ring = &l->ringbuffer;
+ struct dm_repl_slink *slink0 = slink_find(l, 0);
+ struct list_head *buckets[2];
+
+ if (IS_ERR(slink0))
+ return PTR_ERR(slink0);
+
+ /*
+ * Check if there's writes pending to the area the bio intends
+ * to read and if so, satisfy request from ring buffer.
+ */
+ /* We've got writes in the log for this bio. */
+ r = ringbuffer_writes_pending(&ring->busy_sectors, bio, buckets);
+ if (r) {
+ atomic_inc(&l->stats.writes_pending);
+ r = bio_read(l, bio, buckets);
+ /* Simple case: no writes in the log for this bio. */
+ } else {
+ /*
+ * slink->ops->io() will check if region is in sync
+ * and return -EAGAIN in case the I/O needs
+ * to be delayed. Returning -ENODEV etc. is fatal.
+ *
+ * WARNING: bio->bi_bdev changed after return!
+ */
+ r = slink0->ops->io(slink0, bio, 0);
+ if (r < 0)
+ /* No retry possibility is fatal. */
+ BUG_ON(unlikely(r != -EAGAIN));
+ }
+
+ return r;
+}
+
+/* Work on any IOS queued into the ring buffer. */
+static void
+do_ringbuffer_ios(struct repl_log *l)
+{
+ int r;
+ struct bio *bio;
+ struct bio_list ios_in;
+
+ DMDEBUG_LIMIT("%s %u start", __func__, jiffies_to_msecs(jiffies));
+
+ bio_list_init(&ios_in);
+
+ /* Quickly grab the bio input list. */
+ spin_lock(&l->io.lock);
+ bio_list_merge(&ios_in, &l->io.in);
+ bio_list_init(&l->io.in);
+ spin_unlock(&l->io.lock);
+
+ while ((bio = bio_list_pop(&ios_in))) {
+ /* FATAL: ring buffer I/O error ocurred! */
+ if (unlikely(RingBufferError(&l->ringbuffer)))
+ bio_endio(bio, -EIO);
+ else if (bio_data_dir(bio) == READ) {
+ r = ringbuffer_read_bio(l, bio);
+ /* We have to wait. */
+ if (r < 0) {
+ bio_list_push(&ios_in, bio);
+ break;
+ }
+ } else
+ /* Insert new write bio into ring buffer. */
+ ringbuffer_write_entry(l, bio);
+ }
+
+ DMDEBUG_LIMIT("%s %u end ", __func__, jiffies_to_msecs(jiffies));
+
+ if (!bio_list_empty(&ios_in)) {
+ spin_lock(&l->io.lock);
+ bio_list_merge_head(&l->io.in, &ios_in);
+ spin_unlock(&l->io.lock);
+ }
+}
+
+/*
+ * Set any slinks requested by the recovery callback to accessible.
+ *
+ * Needs doing in the main worker thread in order to avoid
+ * a race between do_slink_ios() and slink_recover_callback(),
+ * which is being called asynchrnously from the slink module.
+ */
+static void
+do_slinks_accessible(struct repl_log *l)
+{
+ unsigned long slink_nr;
+
+ /* Reset any requested inaccessible bits. */
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+ if (slink_test_bit(slink_nr, LOG_SLINKS_SET_ACCESSIBLE(l))) {
+ slink_clear_bit(slink_nr, LOG_SLINKS_INACCESSIBLE(l));
+ slink_clear_bit(slink_nr, LOG_SLINKS_SET_ACCESSIBLE(l));
+ }
+ }
+}
+
+/* Drop reference on a copy context and put on endio list on last drop. */
+static void
+slink_copy_context_put(struct slink_copy_context *cc)
+{
+ DMDEBUG_LIMIT("%s", __func__);
+
+ if (atomic_dec_and_test(&cc->cnt)) {
+ int slink_nr;
+ unsigned long flags;
+ struct repl_log *l = ringbuffer_repl_log(cc->entry->ring);
+ struct dm_repl_slink *slink = cc->slink;
+
+ /* last put, schedule completion */
+ DMDEBUG_LIMIT("last put, scheduling do_log");
+
+ _BUG_ON_PTR(l);
+ _BUG_ON_PTR(slink);
+ slink_nr = slink->ops->slink_number(slink);
+ _BUG_ON_SLINK_NR(l, slink_nr);
+
+ write_lock_irqsave(&l->lists.lock, flags);
+ BUG_ON(list_empty(&cc->list));
+ list_move_tail(&cc->list, L_SLINK_ENDIO_LIST(l));
+ write_unlock_irqrestore(&l->lists.lock, flags);
+
+ wake_do_log(l);
+ } else
+ BUG_ON(atomic_read(&cc->cnt) < 0);
+}
+
+enum slink_endio_type { SLINK_ENDIO_RAM, SLINK_ENDIO_DISK };
+static void
+slink_copy_endio(enum slink_endio_type type, int read_err, int write_err,
+ void *context)
+{
+ struct slink_copy_context *cc = context;
+ struct slink_copy_error *error;
+
+ DMDEBUG_LIMIT("%s", __func__);
+ _BUG_ON_PTR(cc);
+ error = cc->error;
+
+ if (type == SLINK_ENDIO_RAM) {
+ /* On RAM endio error, no disk callback will be performed. */
+ if (unlikely(read_err || write_err))
+ atomic_dec(&cc->cnt);
+
+ error += ERR_RAM;
+ } else
+ error += ERR_DISK;
+
+ error->read = read_err;
+ error->write = write_err;
+ slink_copy_context_put(cc);
+}
+
+/* Callback for copy in RAM. */
+static void
+slink_copy_ram_endio(int read_err, int write_err, void *context)
+{
+ slink_copy_endio(SLINK_ENDIO_RAM, read_err, write_err, context);
+}
+
+/* Callback for copy on disk. */
+static void
+slink_copy_disk_endio(int read_err, int write_err, void *context)
+{
+ slink_copy_endio(SLINK_ENDIO_DISK, read_err, write_err, context);
+}
+
+/*
+ * Called back when:
+ *
+ * o site link recovered from failure
+ * o site link recovered a region.
+ */
+static void
+slink_recover_callback(int read_err, int write_err, void *context)
+{
+ unsigned slink_nr;
+ struct repl_log *l;
+ struct slink_state *ss = context;
+
+ _BUG_ON_PTR(ss);
+ l = ss->l;
+ _BUG_ON_PTR(l);
+ slink_nr = ss->slink_nr;
+ _BUG_ON_SLINK_NR(l, slink_nr);
+
+ DMDEBUG_LIMIT("%s slink=%d", __func__, slink_nr);
+
+ if (!read_err && !write_err)
+ slink_set_bit(slink_nr, LOG_SLINKS_SET_ACCESSIBLE(l));
+
+ /* Inform caller, that we're willing to receive more I/Os. */
+ notify_caller(l, WRITE, 0);
+
+ /* Wakeup worker to allow for further IO. */
+ wake_do_log(l);
+}
+
+/* Initialize slink_copy global properties independent of entry. */
+static void
+slink_copy_init(struct dm_repl_slink_copy *slink_copy, struct repl_log *l)
+{
+ /*
+ * The source block device (ie. the ring buffer device)
+ * is the same for all I/Os.
+ */
+ slink_copy->src.type = DM_REPL_SLINK_BLOCK_DEVICE;
+ slink_copy->src.dev.bdev = repl_log_bdev(l);
+
+ /* The destination is identified by slink and device number. */
+ slink_copy->dst.type = DM_REPL_SLINK_DEV_NUMBER;
+
+ /* RAM, disk, slink recovery callbacks. */
+ slink_copy->ram.fn = slink_copy_ram_endio;
+ slink_copy->disk.fn = slink_copy_disk_endio;
+}
+
+/* Initialize slink_copy global properties dependent of entry. */
+static void
+slink_copy_addr(struct dm_repl_slink_copy *slink_copy,
+ struct ringbuffer_entry *entry)
+{
+ struct data_header *header = entry->data.header;
+ struct data_header_region *region;
+
+ _BUG_ON_PTR(header);
+ region = &header->region;
+ _BUG_ON_PTR(region);
+
+ /* The offset/size to copy from is given by the entry. */
+ slink_copy->src.sector = header->pos.data;
+
+ /* Most of the destination is the same across slinks. */
+ slink_copy->dst.dev.number.dev = region->dev;
+ slink_copy->dst.sector = region->sector;
+ slink_copy->size = region->size;
+}
+
+/* Allocate and initialize and slink_copy_context structure. */
+static inline struct slink_copy_context *
+slink_copy_context_alloc(struct ringbuffer_entry *entry,
+ struct dm_repl_slink *slink)
+{
+ struct slink_copy_context *cc = alloc_copy_context(entry->ring);
+
+ BUG_ON(!cc);
+ memset(cc, 0, sizeof(*cc));
+
+ /* NR_ENDIOS # of endios callbacks per copy (RAM and disk). */
+ atomic_set(&cc->cnt, NR_ENDIOS);
+ cc->entry = entry;
+ cc->slink = slink;
+ cc->start_jiffies = jiffies;
+ return cc;
+}
+
+/* Trigger/prohibit resynchronization on all site links. */
+enum resync_switch { RESYNC_OFF = 0, RESYNC_ON };
+static void
+resync_on_off(struct repl_log *l, enum resync_switch resync)
+{
+ unsigned long slink_nr;
+ struct dm_repl_slink *slink;
+
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+ slink = slink_find(l, slink_nr);
+ if (!IS_ERR(slink))
+ slink->ops->resync(slink, resync);
+ }
+}
+
+/* Return true if all slinks processed (either active or inaccessible). */
+static int
+all_slinks_processed(struct repl_log *l)
+{
+ unsigned slinks = 0;
+ unsigned long slink_nr;
+
+ for_each_bit(slink_nr, LOG_SLINKS_IO(l), l->slink.max)
+ slinks++;
+
+ for_each_bit(slink_nr, LOG_SLINKS_INACCESSIBLE(l), l->slink.max)
+ slinks++;
+
+ return slinks >= l->slink.count;
+}
+
+/*
+ * Work all site link copy orders.
+ */
+static void
+do_slink_ios(struct repl_log *l)
+{
+ unsigned long slink_nr;
+ struct ringbuffer_entry *entry;
+ struct dm_repl_slink *slink;
+ static struct dm_repl_slink_copy slink_copy;
+
+ /* If there's no entries on the copy list, allow resync. */
+ if (list_empty(L_SLINK_COPY_LIST(l)))
+ return resync_on_off(l, RESYNC_ON);
+
+ /*
+ * ...else prohibit resync.
+ *
+ * We'll deal with any active resynchronization based
+ * on the return code of slink->ops->copy() below.
+ */
+ resync_on_off(l, RESYNC_OFF);
+
+ /*
+ * This list is ordered, how do we keep it so that endio processing
+ * is ordered? We need this so that head pointer advances in order.
+ *
+ * We do that by changing ringbuffer_advance_head() to check
+ * for entry_busy(l, ENTRY_SLINKS(entry))) and stop processing. -HJM
+ */
+
+ /* Initialize global properties, which are independent of the entry. */
+ slink_copy_init(&slink_copy, l);
+
+ /* Walk all entries on the slink copy list. */
+ list_for_each_entry(entry, L_SLINK_COPY_LIST(l),
+ lists.l[E_WRITE_OR_COPY]) {
+ int r;
+ unsigned copies = 0;
+
+ /* Check, if all slinks processed now. */
+ r = all_slinks_processed(l);
+ if (r)
+ break;
+
+ /* Set common parts independent of slink up. */
+ slink_copy_addr(&slink_copy, entry);
+
+ /* Walk all slinks, which still need this entry. */
+ for_each_bit(slink_nr, ENTRY_SLINKS(entry), l->slink.max) {
+ int teardown;
+ struct slink_copy_context *cc;
+ struct slink_state *ss;
+
+ /*
+ * One maximum write pending to slink already
+ * -or-
+ * slink is recovering this region.
+ */
+ if (slink_test_bit(slink_nr, LOG_SLINKS_IO(l)) ||
+ slink_test_bit(slink_nr,
+ LOG_SLINKS_INACCESSIBLE(l)))
+ continue;
+
+ /*
+ * Check for deleted or being torn down site link.
+ */
+ slink = slink_find(l, slink_nr);
+ if (unlikely(IS_ERR(slink))) {
+ DMERR_LIMIT("%s no slink!", __func__);
+ ss = NULL;
+ teardown = 0;
+ } else {
+ ss = slink->caller;
+ _BUG_ON_PTR(ss);
+ teardown = SsTeardown(ss);
+ }
+
+ if (unlikely(IS_ERR(slink) ||
+ teardown ||
+ !slink_test_bit(slink_nr,
+ LOG_SLINKS(l)))) {
+drop_copy:
+ if (IS_ERR(slink))
+ DMERR_LIMIT("%s: slink %lu not "
+ "configured!",
+ __func__, slink_nr);
+ else
+ /* Correct fallbehind account. */
+ slink_fallbehind_dec(slink, entry);
+
+ /* Flag entry copied to slink_nr. */
+ slink_clear_bit(slink_nr, ENTRY_SLINKS(entry));
+
+ /* Reset any sync copy request to slink_nr. */
+ slink_clear_bit(slink_nr, ENTRY_SYNC(entry));
+
+ if (!slink_nr)
+ sector_range_clear_busy(entry);
+
+ continue;
+ }
+
+ /* Take slink reference out. */
+ ss_io_get(ss);
+
+ /* Flag active copy to slink+entry, */
+ slink_set_bit(slink_nr, LOG_SLINKS_IO(l));
+ slink_set_bit(slink_nr, ENTRY_IOS(entry));
+
+ /* Fill in the destination slink number. */
+ slink_copy.dst.dev.number.slink = slink_nr;
+
+ /* Setup the callback data. */
+ cc = slink_copy_context_alloc(entry, slink);
+ BUG_ON(!cc);
+ slink_copy.ram.context = slink_copy.disk.context = cc;
+
+ /*
+ * Add to entrys copy list of active copies in
+ * order to avoid race with ->copy() endio function
+ * accessing cc->list.
+ */
+ write_lock_irq(&l->lists.lock);
+ list_add_tail(&cc->list, E_COPY_CONTEXT_LIST(entry));
+ write_unlock_irq(&l->lists.lock);
+
+ DMDEBUG("slink0->ops->copy() from log, sector=%llu, "
+ "size=%u to dev_number=%d, sector=%llu "
+ "on slink=%u",
+ (unsigned long long) slink_copy.src.sector,
+ slink_copy.size,
+ slink_copy.dst.dev.number.dev,
+ (unsigned long long) slink_copy.dst.sector,
+ slink_copy.dst.dev.number.slink);
+
+
+ /*
+ * slink->ops->copy() may return:
+ *
+ * o -EAGAIN in case of prohibiting I/O because
+ * of device inaccessibility/suspension
+ * or device I/O errors
+ * (i.e. link temporarilly down) ->
+ * caller is allowed to retry the I/O later once
+ * he'll have received a callback.
+ *
+ * o -EACCES in case a region is being resynchronized
+ * and the source region is being read to copy data
+ * accross to the same region of the replica (RD) ->
+ * caller is allowed to retry the I/O later once
+ * he'll have received a callback.
+ *
+ * o -ENODEV in case a device is not configured
+ * caller must drop the I/O to the device/slink pair.
+ *
+ * o -EPERM in case a region is out of sync ->
+ * caller must drop the I/O to the device/slink pair.
+ */
+ r = slink->ops->copy(slink, &slink_copy, 0);
+ if (unlikely(r < 0)) {
+ DMDEBUG_LIMIT("Copy to slink%d/dev%d/"
+ "sector=%llu failed with %d.",
+ slink_copy.dst.dev.number.slink,
+ slink_copy.dst.dev.number.dev,
+ (unsigned long long)
+ slink_copy.dst.sector, r);
+
+ /*
+ * Failed -> take off entrys copies list
+ * and free copy contrext.
+ */
+ write_lock_irq(&l->lists.lock);
+ list_del_init(&cc->list);
+ write_unlock_irq(&l->lists.lock);
+
+ free_copy_context(cc, entry->ring);
+
+ /* Reset active I/O on slink+entry. */
+ slink_clear_bit(slink_nr, LOG_SLINKS_IO(l));
+ slink_clear_bit(slink_nr, ENTRY_IOS(entry));
+
+ /* Release slink reference. */
+ ss_io_put(ss);
+
+ /*
+ * Source region is being read for recovery
+ * or device is temporarilly inaccessible ->
+ * retry later once accessible again.
+ */
+ if (r == -EACCES ||
+ r == -EAGAIN) {
+ slink_set_bit(slink_nr,
+ LOG_SLINKS_INACCESSIBLE(l));
+
+ /*
+ * Device not on slink
+ * -or-
+ * region not in sync -> avoid copy.
+ */
+ } else if (r == -ENODEV ||
+ r == -EPERM)
+ goto drop_copy;
+ else
+ BUG();
+ } else
+ copies++;
+ }
+
+ if (copies)
+ l->stats.copy[copies > 1]++;
+ }
+}
+
+/* Unplug device queues with entries on all site links. */
+static void
+do_unplug(struct repl_log *l)
+{
+ struct dm_repl_slink *slink;
+ unsigned long slink_nr;
+
+ /* Conditionally unplug ring buffer. */
+ if (TestClearRingBufferIOQueued(&l->ringbuffer))
+ blk_unplug(bdev_get_queue(ringbuffer_bdev(&l->ringbuffer)));
+
+ /* Unplug any devices with queued IO on site links. */
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+ slink = slink_find(l, slink_nr);
+ if (!IS_ERR(slink))
+ slink->ops->unplug(slink);
+ }
+}
+
+/* Take out/drop slink state references to synchronize with slink delition. */
+enum reference_type { REF_GET, REF_PUT };
+static inline void
+ss_ref(enum reference_type type, struct repl_log *l)
+{
+ unsigned long slink_nr;
+ void (*f)(struct slink_state *) =
+ type == REF_GET ? ss_io_get : ss_io_put;
+
+ if (!l->slink0)
+ return;
+
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+ struct dm_repl_slink *slink =
+ l->slink0->ops->slink(l->replog, slink_nr);
+
+ _BUG_ON_PTR(slink);
+ f(slink->caller);
+ }
+}
+
+/*
+ * Worker thread.
+ *
+ * Belabour any:
+ * o replicator log ring buffer initialization
+ * o endios on the ring buffer
+ * o endios on any site links
+ * o I/O on site links (copies of buffer entries via site links to [LR]Ds
+ * o I/O to the ring buffer
+ *
+ */
+static void
+do_log(struct work_struct *ws)
+{
+ struct repl_log *l = container_of(ws, struct repl_log, io.ws);
+
+ /* Take out references vs. removal races. */
+ spin_lock(&l->io.lock);
+ ss_ref(REF_GET, l);
+ spin_unlock(&l->io.lock);
+
+ if (!RingSuspended(&l->ringbuffer)) {
+ do_log_init(l);
+ do_log_resize(l);
+ }
+
+ /* Allow for endios at any time, even while suspended. */
+ do_ringbuffer_endios(l); /* Must be called before do_slink_ios. */
+
+ /* Don't allow for new I/Os while suspended. */
+ if (!RingSuspended(&l->ringbuffer)) {
+ int r;
+
+ do_slink_endios(l);
+ do_ringbuffer_ios(l);
+
+ /*
+ * Set any slinks requested to accessible
+ * before checking all_slinks_processed().
+ */
+ do_slinks_accessible(l);
+
+ /* Only initiate slink copies if not all slinks active. */
+ r = all_slinks_processed(l);
+ if (!r)
+ do_slink_ios(l);
+
+ do_unplug(l);
+ }
+
+ ss_ref(REF_PUT, l);
+}
+
+/*
+ * Start methods of "default" type
+ */
+/* Destroy a replicator log context. */
+static void
+ringbuffer_dtr(struct dm_repl_log *log, struct dm_target *ti)
+{
+ struct repl_log *l;
+
+ DMDEBUG("%s: log %p", __func__, log);
+ _SET_AND_BUG_ON_L(l, log);
+
+ /* Remove from the global list of replogs. */
+ mutex_lock(&list_mutex);
+ list_del_init(L_REPLOG_LIST(l));
+ mutex_unlock(&list_mutex);
+
+ replog_destroy(l);
+ BUG_ON(!replog_put(log, ti));
+}
+
+/*
+ * Construct a replicator log context.
+ *
+ * Arguments:
+ * #replog_params dev_path dev_start [auto/create/open [size]
+ *
+ * dev_path = device path of replication log (REPLOG) backing store
+ * dev_start = offset in sectors to REPLOG header
+ *
+ * auto = causes open of an REPLOG with a valid header or
+ * creation of a new REPLOG in case the header's invalid.
+ * <#replog_params> = 2 or (3 and "open")
+ * -> the cache device must be initialized or the constructor will fail.
+ * <#replog_params> = 4 and "auto"
+ * -> if not already initialized, the log device will get initialized
+ * and sized to "size", otherwise it'll be opened.
+ * <#replog_params> = 4 and 'create'
+ * -> the log device will get initialized if not active and sized to
+ * "size"; if the REPLOG is active 'create' will fail.
+ *
+ * The above roughly translates to:
+ * argv[0] == #params
+ * argv[1] == dev_name
+ * argv[2] == dev_start
+ * argv[3] == OT_OPEN|OT_CREATE|OT_AUTO
+ * argv[4] == size in sectors
+ */
+#define MIN_ARGS 3
+static int
+ringbuffer_ctr(struct dm_repl_log *log, struct dm_target *ti,
+ unsigned argc, char **argv)
+{
+ int open_type, params;
+ unsigned long long tmp;
+ struct repl_log *l;
+ struct repl_params p;
+
+ SHOW_ARGV;
+
+ if (unlikely(argc < MIN_ARGS))
+ DM_EINVAL("%s: at least 3 args required, only got %d\n",
+ __func__, argc);
+
+ memset(&p, 0, sizeof(p));
+
+ /* Get # of parameters. */
+ if (unlikely(sscanf(argv[0], "%d", ¶ms) != 1 ||
+ params < 2 ||
+ params > 5)) {
+ DM_EINVAL("invalid replicator log device start");
+ } else
+ p.count = params;
+
+ if (params == 2)
+ open_type = OT_OPEN;
+ else {
+ open_type = _open_type(argv[3]);
+ if (unlikely(open_type < 0))
+ return -EINVAL;
+ else if (unlikely(open_type == OT_OPEN && params > 3))
+ DM_EINVAL("3 arguments required for open, %d given.",
+ params);
+ }
+
+ p.open_type = open_type;
+
+ if (params > 3) {
+ /* Get device size argument. */
+ if (unlikely(sscanf(argv[4], "%llu", &tmp) != 1 ||
+ tmp < LOG_SIZE_MIN)) {
+ DM_EINVAL("invalid replicator log device size");
+ } else
+ p.dev.size = tmp;
+
+ } else
+ p.dev.size = LOG_SIZE_MIN;
+
+ if (unlikely((open_type == OT_AUTO || open_type == OT_CREATE) &&
+ params < 4))
+ DM_EINVAL("4 arguments required for auto and create");
+
+ /* Get device start argument. */
+ if (unlikely(sscanf(argv[2], "%llu", &tmp) != 1))
+ DM_EINVAL("invalid replicator log device start");
+ else
+ p.dev.start = tmp;
+
+ /* Get a reference on the replog. */
+ l = replog_get(log, ti, argv[1], &p);
+ if (unlikely(IS_ERR(l)))
+ return PTR_ERR(l);
+
+ return 0;
+}
+
+/* Flush the current log contents. This function may block. */
+static int
+ringbuffer_flush(struct dm_repl_log *log)
+{
+ struct repl_log *l;
+ struct ringbuffer *ring;
+
+ DMDEBUG("%s", __func__);
+ _SET_AND_BUG_ON_L(l, log);
+ ring = &l->ringbuffer;
+
+ wake_do_log(l);
+ wait_event(ring->flushq, ringbuffer_empty(ring));
+ return 0;
+}
+
+/* Suspend method. */
+/*
+ * FIXME: we're suspending/resuming the whole ring buffer,
+ * not just the device requested. Avoiding this complete
+ * suspension would afford knowledge on the reason for the suspension.
+ * E.g. in case of device removal, we could avoid suspending completely.
+ * Don't know how we can optimize this w/o a bitmap
+ * for the devices, hence limiting dev_numbers. -HJM
+ */
+static int
+ringbuffer_postsuspend(struct dm_repl_log *log, int dev_number)
+{
+ struct repl_log *l;
+
+ _SET_AND_BUG_ON_L(l, log);
+ flush_workqueue(l->io.wq);
+
+ if (TestSetRingSuspended(&l->ringbuffer))
+ DMWARN("%s ring buffer already suspended", __func__);
+
+ flush_workqueue(l->io.wq);
+ SetRingBlocked(&l->ringbuffer);
+ ss_all_wait_on_ios(l);
+ return 0;
+}
+
+/* Resume method. */
+static int
+ringbuffer_resume(struct dm_repl_log *log, int dev_number)
+{
+ struct repl_log *l;
+ struct ringbuffer *ring;
+
+ _SET_AND_BUG_ON_L(l, log);
+
+ ring = &l->ringbuffer;
+ if (!TestClearRingSuspended(ring))
+ DMWARN("%s ring buffer already resumed", __func__);
+
+ ClearRingBlocked(ring);
+ notify_caller(l, WRITE, 0);
+ wake_do_log(l);
+ return 0;
+}
+
+/*
+ * Queue a bio to the worker thread ensuring, that
+ * there's enough space for writes in the ring buffer.
+ */
+static inline int
+queue_bio(struct repl_log *l, struct bio *bio)
+{
+ int rw = bio_data_dir(bio);
+ struct ringbuffer *ring = &l->ringbuffer;
+
+ /*
+ * Try reserving space for the bio in the
+ * buffer and mark the sector range busy.
+ */
+ if (rw == WRITE) {
+ int r;
+
+ mutex_lock(&ring->mutex);
+ r = ringbuffer_reserve_space(ring, bio);
+ mutex_unlock(&ring->mutex);
+
+ /* Ring buffer full. */
+ if (r < 0)
+ return r;
+ }
+
+ spin_lock(&l->io.lock);
+ bio_list_add(&l->io.in, bio);
+ spin_unlock(&l->io.lock);
+
+ atomic_inc(l->stats.io + !!rw);
+ wake_do_log(l);
+ return 0;
+}
+
+/*
+ * Read a bio either from a replicator log's ring buffer
+ * or from the replicated device if no buffer entry.
+ * - or-
+ * write a bio to a replicator log's ring
+ * buffer (increments buffer tail).
+ *
+ * This includes buffer allocation in case of a write and
+ * inititation of copies accross an/multiple SLINK(s).
+ *
+ * In case of a read with (partial) writes in the buffer,
+ * the replog may postpone the read until the buffer content has
+ * been copied accross the local SLINK *or* optimize by reading
+ * (parts of) the bio off the buffer.
+ */
+/*
+ * Returns 0 on success, -EWOULDBLOCK if this is a WRITE request
+ * and buffer space could not be allocated. Returns -EWOULDBLOCK if
+ * this is a READ request and the call would block due to the
+ * requested region being currently under WRITE I/O.
+ */
+static int
+ringbuffer_io(struct dm_repl_log *log, struct bio *bio, unsigned long long tag)
+{
+ int r = 0;
+ struct repl_log *l;
+ struct ringbuffer *ring;
+
+ _SET_AND_BUG_ON_L(l, log);
+ ring = &l->ringbuffer;
+
+ if (RingBlocked(ring) ||
+ !LogInitialized(l))
+ goto out_blocked;
+
+ if (unlikely(RingSuspended(ring)))
+ goto set_blocked;
+
+ /*
+ * Queue writes to the daemon in order to avoid sleeping
+ * on allocations. queue_bio() checks to see if there is
+ * enough space in the log for this bio and all of the
+ * other bios currently queued for the daemon.
+ */
+ r = queue_bio(l, bio);
+ if (!r)
+ return r;
+
+set_blocked:
+ SetRingBlocked(ring);
+out_blocked:
+ DMDEBUG_LIMIT("%s Ring blocked", __func__);
+ return -EWOULDBLOCK;
+}
+
+/* Set maximum slink # for bitarray access optimization. */
+static void replog_set_slink_max(struct repl_log *l)
+{
+ unsigned long bit_nr;
+
+ l->slink.max = 0;
+ for_each_bit(bit_nr, LOG_SLINKS(l), MAX_DEFAULT_SLINKS)
+ l->slink.max = bit_nr;
+
+ l->slink.max++;
+ BITMAP_ELEMS(l) = dm_div_up(dm_div_up(l->slink.max, BITS_PER_BYTE),
+ sizeof(uint64_t));
+ BITMAP_SIZE(l) = BITMAP_ELEMS(l) * sizeof(uint64_t);
+}
+
+/* Set replog global I/O notification function and context. */
+static void
+ringbuffer_io_notify_fn_set(struct dm_repl_log *log,
+ dm_repl_notify_fn fn, void *notify_context)
+{
+ struct repl_log *l;
+
+ _SET_AND_BUG_ON_L(l, log);
+
+ spin_lock(&l->io.lock);
+ l->notify.fn = fn;
+ l->notify.context = notify_context;
+ spin_unlock(&l->io.lock);
+}
+
+/* Add (tie) a site link to a replication log for SLINK copy processing. */
+static int
+ringbuffer_slink_add(struct dm_repl_log *log, struct dm_repl_slink *slink)
+{
+ int slink_nr;
+ struct repl_log *l;
+ struct slink_state *ss;
+
+ /* FIXME: XXX lock the repl_log */
+ DMDEBUG("ringbuffer_slink_add");
+ _BUG_ON_PTR(slink);
+ _SET_AND_BUG_ON_L(l, log);
+
+ /* See if slink was already added. */
+ slink_nr = slink->ops->slink_number(slink);
+ if (slink_nr >= MAX_DEFAULT_SLINKS)
+ DM_EINVAL("slink number larger than maximum "
+ "for 'default' replication log.");
+
+ DMDEBUG("%s: attempting to add slink%d", __func__, slink_nr);
+
+ /* No entry -> add a new one. */
+ ss = kzalloc(sizeof(*ss), GFP_KERNEL);
+ if (unlikely(!ss))
+ return -ENOMEM;
+
+ ss->slink_nr = slink_nr;
+ ss->l = l;
+ atomic_set(&ss->io.in_flight, 0);
+ init_waitqueue_head(&ss->io.waiters);
+
+ spin_lock(&l->io.lock);
+
+ if (unlikely(slink->caller)) {
+ spin_unlock(&l->io.lock);
+ kfree(ss);
+ DMERR("slink already exists.");
+ return -EEXIST;
+ }
+
+ ClearSsTeardown(ss);
+
+ /* Keep slink state reference. */
+ slink->caller = ss;
+
+ if (!slink_nr)
+ l->slink0 = slink;
+
+ l->slink.count++;
+
+ /* Set site link recovery notification. */
+ slink->ops->recover_notify_fn_set(slink, slink_recover_callback, ss);
+
+ /* Update log_header->slinks bit mask before setting max slink #! */
+ slink_set_bit(slink_nr, LOG_SLINKS(l));
+
+ /* Set maximum slink # for bitarray access optimization. */
+ replog_set_slink_max(l);
+
+ spin_unlock(&l->io.lock);
+ return 0;
+}
+
+/* Remove (untie) a site link from a replication log. */
+/*
+ * How do we tell if this is a configuration change or just a shutdown?
+ * After _repl_ctr, the RDs on the site link are either there or not.
+ */
+static int
+ringbuffer_slink_del(struct dm_repl_log *log, struct dm_repl_slink *slink)
+{
+ int r, slink_nr;
+ struct repl_log *l;
+ struct ringbuffer *ring;
+ struct slink_state *ss;
+
+ DMDEBUG("%s", __func__);
+ _BUG_ON_PTR(slink);
+ _SET_AND_BUG_ON_L(l, log);
+ ring = &l->ringbuffer;
+
+ /* Find entry to be deleted. */
+ slink_nr = slink->ops->slink_number(slink);
+ DMDEBUG("%s slink_nr=%d", __func__, slink_nr);
+
+ spin_lock(&l->io.lock);
+ ss = slink->caller;
+ if (likely(ss)) {
+ BUG_ON(atomic_read(&ss->io.in_flight));
+
+ /* No new I/Os on this slink and no duplicate deletion calls. */
+ if (TestSetSsTeardown(ss)) {
+ spin_unlock(&l->io.lock);
+ return -EPERM;
+ }
+
+ /* Wait on worker and any async I/O to finish on site link. */
+ do {
+ spin_unlock(&l->io.lock);
+ ss_wait_on_io(ss);
+ spin_lock(&l->io.lock);
+
+ if (!ss_io(ss)) {
+ slink_clear_bit(slink_nr, LOG_SLINKS(l));
+ slink->caller = NULL;
+ slink->ops->recover_notify_fn_set(slink,
+ NULL, NULL);
+ if (!slink_nr)
+ l->slink0 = NULL;
+
+ l->slink.count--;
+ replog_set_slink_max(l); /* Set l->slink.max. */
+ }
+ } while (slink->caller);
+
+ spin_unlock(&l->io.lock);
+
+ BUG_ON(l->slink.count < 0);
+ kfree(ss);
+ DMDEBUG("%s removed slink=%u", __func__, slink_nr);
+ r = 0;
+ } else {
+ spin_unlock(&l->io.lock);
+ r = -EINVAL;
+ }
+
+ wake_do_log(l);
+ return r;
+}
+
+/* Return head of the list of site links for this replicator log. */
+static struct dm_repl_log_slink_list
+*ringbuffer_slinks(struct dm_repl_log *log)
+{
+ struct repl_log *l;
+
+ _SET_AND_BUG_ON_L(l, log);
+ return &l->lists.slinks;
+}
+
+/* Return maximum number of supported site links. */
+static int
+ringbuffer_slink_max(struct dm_repl_log *log)
+{
+ return MAX_DEFAULT_SLINKS;
+}
+
+/*
+ * Message interface
+ *
+ * 'sta[tistics] {on,of[f],r[eset]}' # e.g. 'stat of'
+ */
+static int
+ringbuffer_message(struct dm_repl_log *log, unsigned argc, char **argv)
+{
+ static const char stat[] = "statistics";
+ static const char resize[] = "resize";
+ struct repl_log *l;
+
+ _SET_AND_BUG_ON_L(l, log);
+
+ if (argc != 2)
+ DM_EINVAL("Invalid number of arguments.");
+
+ if (!strnicmp(STR_LEN(argv[0], stat))) {
+ if (!strnicmp(STR_LEN(argv[1], "on")))
+ set_bit(LOG_DEVEL_STATS, &l->io.flags);
+ else if (!strnicmp(STR_LEN(argv[1], "off")))
+ clear_bit(LOG_DEVEL_STATS, &l->io.flags);
+ else if (!strnicmp(STR_LEN(argv[1], "reset")))
+ stats_init(l);
+ else
+ DM_EINVAL("Invalid '%s' arguments.", stat);
+ } else if (!strnicmp(STR_LEN(argv[0], resize))) {
+ if (TestSetLogResize(l))
+ DM_EPERM("Log resize already in progress");
+ else {
+ unsigned long long tmp;
+ sector_t dev_size;
+
+ if (unlikely(sscanf(argv[1], "%llu", &tmp) != 1) ||
+ tmp < LOG_SIZE_MIN)
+ DM_EINVAL("Invalid log %s argument.", resize);
+
+ dev_size = replog_dev_size(l->params.dev.dm_dev, tmp);
+ if (!dev_size)
+ DM_EINVAL("Invalid log size requested.");
+
+ l->params.dev.size = tmp;
+ wake_do_log(l); /* Let the worker do the resize. */
+ }
+ } else
+ DM_EINVAL("Invalid argument.");
+
+ return 0;
+}
+
+/* Support function for replicator log status requests. */
+static int
+ringbuffer_status(struct dm_repl_log *log, int dev_number,
+ status_type_t type, char *result, unsigned int maxlen)
+{
+ unsigned long slink_nr;
+ size_t sz = 0;
+ sector_t ios, sectors;
+ char buf[BDEVNAME_SIZE];
+ struct repl_log *l;
+ struct stats *s;
+ struct ringbuffer *ring;
+ struct repl_params *p;
+
+ _SET_AND_BUG_ON_L(l, log);
+ s = &l->stats;
+ ring = &l->ringbuffer;
+ p = &l->params;
+
+ switch (type) {
+ case STATUSTYPE_INFO:
+ ios = sectors = 0;
+
+ /* Output ios/sectors stats. */
+ spin_lock(&l->io.lock);
+ for_each_bit(slink_nr, LOG_SLINKS(l), l->slink.max) {
+ struct dm_repl_slink *slink = slink_find(l, slink_nr);
+ struct slink_state *ss;
+
+ _BUG_ON_PTR(slink);
+ ss = slink->caller;
+ _BUG_ON_PTR(ss);
+
+ DMEMIT(" %s,%llu,%llu",
+ SsSync(ss) ? "S" : "A",
+ (unsigned long long) ss->fb.outstanding.ios,
+ (unsigned long long) ss->fb.outstanding.sectors);
+ ios += ss->fb.outstanding.ios;
+ sectors += ss->fb.outstanding.sectors;
+ }
+
+ DMEMIT(" %llu/%llu/%llu",
+ (unsigned long long) ios,
+ (unsigned long long) sectors,
+ (unsigned long long) l->params.dev.size);
+
+ spin_unlock(&l->io.lock);
+
+ if (LogDevelStats(l))
+ DMEMIT(" ring->start=%llu "
+ "ring->head=%llu ring->tail=%llu "
+ "ring->next_avail=%llu ring->end=%llu "
+ "ring_free=%llu wrap=%d r=%d w=%d wp=%d he=%d "
+ "hash_insert=%u hash_insert_max=%u "
+ "single=%u multi=%u stall=%u",
+ (unsigned long long) ring->start,
+ (unsigned long long) ring->head,
+ (unsigned long long) ring->tail,
+ (unsigned long long) ring->next_avail,
+ (unsigned long long) ring->end,
+ (unsigned long long) ring_free(ring),
+ s->wrap,
+ atomic_read(s->io + 0), atomic_read(s->io + 1),
+ atomic_read(&s->writes_pending),
+ atomic_read(&s->hash_elem),
+ s->hash_insert, s->hash_insert_max,
+ s->copy[0], s->copy[1],
+ s->stall);
+
+ break;
+
+ case STATUSTYPE_TABLE:
+ DMEMIT("%s %d %s %llu", ringbuffer_type.type.name, p->count,
+ format_dev_t(buf, p->dev.dm_dev->bdev->bd_dev),
+ (unsigned long long) p->dev.start);
+
+ if (p->count > 2) {
+ DMEMIT(" %s", _open_str(p->open_type));
+
+ if (p->count > 3)
+ DMEMIT(" %llu",
+ (unsigned long long) p->dev.size);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * End methods of "ring-buffer" type
+ */
+
+/* "ring-buffer" replication log type. */
+static struct dm_repl_log_type ringbuffer_type = {
+ .type.name = "ringbuffer",
+ .type.module = THIS_MODULE,
+
+ .ctr = ringbuffer_ctr,
+ .dtr = ringbuffer_dtr,
+
+ .postsuspend = ringbuffer_postsuspend,
+ .resume = ringbuffer_resume,
+ .flush = ringbuffer_flush,
+ .io = ringbuffer_io,
+ .io_notify_fn_set = ringbuffer_io_notify_fn_set,
+
+ .slink_add = ringbuffer_slink_add,
+ .slink_del = ringbuffer_slink_del,
+ .slinks = ringbuffer_slinks,
+ .slink_max = ringbuffer_slink_max,
+
+ .message = ringbuffer_message,
+ .status = ringbuffer_status,
+};
+
+/* Destroy kmem caches on module unload. */
+static int
+replog_kmem_caches_exit(void)
+{
+ struct cache_defs *pd = ARRAY_END(cache_defs);
+
+ while (pd-- > cache_defs) {
+ if (unlikely(!pd->slab_pool))
+ continue;
+
+ DMDEBUG("Destroying kmem_cache %p", pd->slab_pool);
+ kmem_cache_destroy(pd->slab_pool);
+ pd->slab_pool = NULL;
+ }
+
+ return 0;
+}
+
+/* Create kmem caches on module load. */
+static int
+replog_kmem_caches_init(void)
+{
+ int r = 0;
+ struct cache_defs *pd = ARRAY_END(cache_defs);
+
+ while (pd-- > cache_defs) {
+ BUG_ON(pd->slab_pool);
+
+ /* No slab pool. */
+ if (!pd->size)
+ continue;
+
+ pd->slab_pool = kmem_cache_create(pd->slab_name, pd->size,
+ pd->align, 0, NULL);
+ if (likely(pd->slab_pool))
+ DMDEBUG("Created kmem_cache %p", pd->slab_pool);
+ else {
+ DMERR("failed to create slab %s for replication log "
+ " handler %s %s",
+ pd->slab_name, ringbuffer_type.type.name,
+ version);
+ replog_kmem_caches_exit();
+ r = -ENOMEM;
+ break;
+ }
+ }
+
+ return r;
+}
+
+int __init
+dm_repl_log_init(void)
+{
+ int r;
+
+ if (sizeof(struct data_header_disk) != DATA_HEADER_DISK_SIZE)
+ DM_EINVAL("invalid size of 'struct data_header_disk' for %s %s",
+ ringbuffer_type.type.name, version);
+
+ mutex_init(&list_mutex);
+
+ r = replog_kmem_caches_init();
+ if (r < 0) {
+ DMERR("failed to init %s kmem caches %s",
+ ringbuffer_type.type.name, version);
+ return r;
+ }
+
+ r = dm_register_type(&ringbuffer_type, DM_REPLOG);
+ if (r < 0) {
+ DMERR("failed to register replication log %s handler %s [%d]",
+ ringbuffer_type.type.name, version, r);
+ replog_kmem_caches_exit();
+ } else
+ DMINFO("registered replication log %s handler %s",
+ ringbuffer_type.type.name, version);
+
+ return r;
+}
+
+void __exit
+dm_repl_log_exit(void)
+{
+ int r = dm_unregister_type(&ringbuffer_type, DM_REPLOG);
+
+ replog_kmem_caches_exit();
+
+ if (r)
+ DMERR("failed to unregister replication log %s handler %s [%d]",
+ ringbuffer_type.type.name, version, r);
+ else
+ DMINFO("unregistered replication log %s handler %s",
+ ringbuffer_type.type.name, version);
+}
+
+/* Module hooks */
+module_init(dm_repl_log_init);
+module_exit(dm_repl_log_exit);
+
+MODULE_DESCRIPTION(DM_NAME " remote replication target \"ringbuffer\" "
+ "log handler");
+MODULE_AUTHOR("Jeff Moyer <jmoyer@redhat.com>, "
+ "Heinz Mauelshagen <heinzm@redhat.com");
+MODULE_LICENSE("GPL");