@@ -17,3 +17,4 @@ common-obj-y += filter.o
common-obj-y += filter-buffer.o
common-obj-y += filter-mirror.o
common-obj-y += colo-compare.o
+common-obj-y += colo.o
@@ -26,13 +26,38 @@
#include "sysemu/char.h"
#include "qemu/sockets.h"
#include "qapi-visit.h"
+#include "net/colo.h"
+#include "trace.h"
#define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
#define COMPARE_READ_LEN_MAX NET_BUFSIZE
+#define MAX_QUEUE_SIZE 1024
+/*
+ + CompareState ++
+ | |
+ +---------------+ +---------------+ +---------------+
+ |conn list +--->conn +--------->conn |
+ +---------------+ +---------------+ +---------------+
+ | | | | | |
+ +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
+ |primary | |secondary |primary | |secondary
+ |packet | |packet + |packet | |packet +
+ +--------+ +--------+ +--------+ +--------+
+ | | | |
+ +---v----+ +---v----+ +---v----+ +---v----+
+ |primary | |secondary |primary | |secondary
+ |packet | |packet + |packet | |packet +
+ +--------+ +--------+ +--------+ +--------+
+ | | | |
+ +---v----+ +---v----+ +---v----+ +---v----+
+ |primary | |secondary |primary | |secondary
+ |packet | |packet + |packet | |packet +
+ +--------+ +--------+ +--------+ +--------+
+*/
typedef struct CompareState {
Object parent;
@@ -44,6 +69,9 @@ typedef struct CompareState {
CharDriverState *chr_out;
SocketReadState pri_rs;
SocketReadState sec_rs;
+
+ /* hashtable to save connection */
+ GHashTable *connection_track_table;
} CompareState;
typedef struct CompareClass {
@@ -54,6 +82,76 @@ typedef struct CompareChardevProps {
bool is_socket;
} CompareChardevProps;
+enum {
+ PRIMARY_IN = 0,
+ SECONDARY_IN,
+};
+
+static int compare_chr_send(CharDriverState *out,
+ const uint8_t *buf,
+ uint32_t size);
+
+/*
+ * Return 0 on success, if return -1 means the pkt
+ * is unsupported(arp and ipv6) and will be sent later
+ */
+static int packet_enqueue(CompareState *s, int mode)
+{
+ Packet *pkt = NULL;
+
+ if (mode == PRIMARY_IN) {
+ pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
+ } else {
+ pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
+ }
+
+ if (parse_packet_early(pkt)) {
+ packet_destroy(pkt, NULL);
+ pkt = NULL;
+ return -1;
+ }
+ /* TODO: get connection key from pkt */
+
+ /*
+ * TODO: use connection key get conn from
+ * connection_track_table
+ */
+
+ /*
+ * TODO: insert pkt to it's conn->primary_list
+ * or conn->secondary_list
+ */
+
+ return 0;
+}
+
+static int compare_chr_send(CharDriverState *out,
+ const uint8_t *buf,
+ uint32_t size)
+{
+ int ret = 0;
+ uint32_t len = htonl(size);
+
+ if (!size) {
+ return 0;
+ }
+
+ ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
+ if (ret != sizeof(len)) {
+ goto err;
+ }
+
+ ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
+ if (ret != size) {
+ goto err;
+ }
+
+ return 0;
+
+err:
+ return ret < 0 ? ret : -EIO;
+}
+
static char *compare_get_pri_indev(Object *obj, Error **errp)
{
CompareState *s = COLO_COMPARE(obj);
@@ -101,12 +199,21 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
static void compare_pri_rs_finalize(SocketReadState *pri_rs)
{
- /* if packet_enqueue pri pkt failed we will send unsupported packet */
+ CompareState *s = container_of(pri_rs, CompareState, pri_rs);
+
+ if (packet_enqueue(s, PRIMARY_IN)) {
+ trace_colo_compare_main("primary: unsupported packet in");
+ compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+ }
}
static void compare_sec_rs_finalize(SocketReadState *sec_rs)
{
- /* if packet_enqueue sec pkt failed we will notify trace */
+ CompareState *s = container_of(sec_rs, CompareState, sec_rs);
+
+ if (packet_enqueue(s, SECONDARY_IN)) {
+ trace_colo_compare_main("secondary: unsupported packet in");
+ }
}
static int compare_chardev_opts(void *opaque,
@@ -202,6 +309,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
+ /* use g_hash_table_new_full() to new a hashtable */
+
return;
}
new file mode 100644
@@ -0,0 +1,86 @@
+/*
+ * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
+ * (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Copyright (c) 2016 Intel Corporation
+ *
+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "trace.h"
+#include "net/colo.h"
+
+int parse_packet_early(Packet *pkt)
+{
+ int network_length;
+ static const uint8_t vlan[] = {0x81, 0x00};
+ uint8_t *data = pkt->data;
+ uint16_t l3_proto;
+ ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
+
+ if (pkt->size < ETH_HLEN) {
+ trace_colo_proxy_main("pkt->size < ETH_HLEN");
+ return 1;
+ }
+
+ /*
+ * TODO: support vlan.
+ */
+ if (!memcmp(&data[12], vlan, sizeof(vlan))) {
+ trace_colo_proxy_main("COLO-proxy don't support vlan");
+ return 1;
+ }
+
+ pkt->network_header = data + l2hdr_len;
+
+ const struct iovec l2vec = {
+ .iov_base = (void *) data,
+ .iov_len = l2hdr_len
+ };
+ l3_proto = eth_get_l3_proto(&l2vec, 1, l2hdr_len);
+
+ if (l3_proto != ETH_P_IP) {
+ return 1;
+ }
+
+ network_length = pkt->ip->ip_hl * 4;
+ if (pkt->size < l2hdr_len + network_length) {
+ trace_colo_proxy_main("pkt->size < network_header + network_length");
+ return 1;
+ }
+ pkt->transport_header = pkt->network_header + network_length;
+
+ return 0;
+}
+
+Packet *packet_new(const void *data, int size)
+{
+ Packet *pkt = g_slice_new(Packet);
+
+ pkt->data = g_memdup(data, size);
+ pkt->size = size;
+
+ return pkt;
+}
+
+void packet_destroy(void *opaque, void *user_data)
+{
+ Packet *pkt = opaque;
+
+ g_free(pkt->data);
+ g_slice_free(Packet, pkt);
+}
+
+/*
+ * Clear hashtable, stop this hash growing really huge
+ */
+void connection_hashtable_reset(GHashTable *connection_track_table)
+{
+ g_hash_table_remove_all(connection_track_table);
+}
new file mode 100644
@@ -0,0 +1,37 @@
+/*
+ * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
+ * (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Copyright (c) 2016 Intel Corporation
+ *
+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later. See the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_COLO_PROXY_H
+#define QEMU_COLO_PROXY_H
+
+#include "slirp/slirp.h"
+
+#define HASHTABLE_MAX_SIZE 16384
+
+typedef struct Packet {
+ void *data;
+ union {
+ uint8_t *network_header;
+ struct ip *ip;
+ };
+ uint8_t *transport_header;
+ int size;
+} Packet;
+
+int parse_packet_early(Packet *pkt);
+void connection_hashtable_reset(GHashTable *connection_track_table);
+Packet *packet_new(const void *data, int size);
+void packet_destroy(void *opaque, void *user_data);
+
+#endif /* QEMU_COLO_PROXY_H */
@@ -140,6 +140,12 @@ memory_region_subpage_write(int cpu_index, void *mr, uint64_t offset, uint64_t v
memory_region_tb_read(int cpu_index, uint64_t addr, uint64_t value, unsigned size) "cpu %d addr %#"PRIx64" value %#"PRIx64" size %u"
memory_region_tb_write(int cpu_index, uint64_t addr, uint64_t value, unsigned size) "cpu %d addr %#"PRIx64" value %#"PRIx64" size %u"
+# net/colo.c
+colo_proxy_main(const char *chr) ": %s"
+
+# net/colo-compare.c
+colo_compare_main(const char *chr) ": %s"
+
### Guest events, keep at bottom
# @vaddr: Access' virtual address.