/* $NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $ */ /*- * Copyright (c)2010,2011 YAMAMOTO Takashi, * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ /* * backend db operations */ #include #ifndef lint __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $"); #endif /* not lint */ #include #include #include #include #include #include #include #include #include #include #include #include "pgfs_db.h" #include "pgfs_waitq.h" #include "pgfs_debug.h" bool pgfs_dosync = false; struct Xconn { TAILQ_ENTRY(Xconn) list; PGconn *conn; struct puffs_cc *blocker; struct puffs_cc *owner; bool in_trans; int id; const char *label; }; static void dumperror(struct Xconn *xc, const PGresult *res) { static const struct { const char *name; int code; } fields[] = { #define F(x) { .name = #x, .code = x, } F(PG_DIAG_SEVERITY), F(PG_DIAG_SQLSTATE), F(PG_DIAG_MESSAGE_PRIMARY), F(PG_DIAG_MESSAGE_DETAIL), F(PG_DIAG_MESSAGE_HINT), F(PG_DIAG_STATEMENT_POSITION), F(PG_DIAG_INTERNAL_POSITION), F(PG_DIAG_INTERNAL_QUERY), F(PG_DIAG_CONTEXT), F(PG_DIAG_SOURCE_FILE), F(PG_DIAG_SOURCE_LINE), F(PG_DIAG_SOURCE_FUNCTION), #undef F }; unsigned int i; if (!pgfs_dodprintf) { return; } assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR || PQresultStatus(res) == PGRES_FATAL_ERROR); for (i = 0; i < __arraycount(fields); i++) { const char *val = PQresultErrorField(res, fields[i].code); if (val == NULL) { continue; } fprintf(stderr, "%s: %s\n", fields[i].name, val); } } TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist); struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq); static struct Xconn * getxc(struct puffs_cc *cc) { struct Xconn *xc; assert(cc != NULL); retry: TAILQ_FOREACH(xc, &xclist, list) { if (xc->blocker == NULL) { assert(xc->owner == NULL); xc->owner = cc; DPRINTF("xc %p acquire %p\n", xc, cc); return xc; } else { assert(xc->owner == xc->blocker); } } DPRINTF("no free conn %p\n", cc); waiton(&xcwaitq, cc); goto retry; } static void relxc(struct Xconn *xc) { assert(xc->in_trans); assert(xc->owner != NULL); xc->in_trans = false; xc->owner = NULL; wakeup_one(&xcwaitq); } static void pqwait(struct Xconn *xc) { PGconn *conn = xc->conn; struct puffs_cc *cc = xc->owner; if (PQflush(conn)) { errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn)); } if (!PQisBusy(conn)) { return; } assert(xc->blocker == NULL); xc->blocker = cc; DPRINTF("yielding %p\n", cc); /* XXX is it safe to yield before entering mainloop? */ puffs_cc_yield(cc); DPRINTF("yield returned %p\n", cc); assert(xc->owner == cc); assert(xc->blocker == cc); xc->blocker = NULL; } static int sqltoerrno(const char *sqlstate) { /* * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle * "tuple concurrently updated" errors for lowrite/lo_truncate. * * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN? */ static const struct { char sqlstate[5]; int error; } map[] = { { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */ { "02000", ENOENT, }, /* ERRCODE_NO_DATA */ { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */ { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */ { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */ { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */ { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */ { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */ { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */ { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */ }; unsigned int i; for (i = 0; i < __arraycount(map); i++) { if (!memcmp(map[i].sqlstate, sqlstate, 5)) { const int error = map[i].error; if (error != 0) { DPRINTF("sqlstate %5s mapped to error %d\n", sqlstate, error); } if (error == EINVAL) { /* * sounds like a bug. */ abort(); } return error; } } DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate); return EIO; } struct cmd { char name[32]; /* name of prepared statement */ char *cmd; /* query string */ unsigned int nparams; Oid *paramtypes; uint32_t prepared_mask; /* for which connections this is prepared? */ unsigned int flags; /* CMD_ flags */ }; #define CMD_NOPREPARE 1 /* don't prepare this command */ struct cmd * createcmd(const char *cmd, unsigned int flags, ...) { struct cmd *c; va_list ap; const char *cp; unsigned int i; static unsigned int cmdid; c = emalloc(sizeof(*c)); c->cmd = estrdup(cmd); c->nparams = 0; va_start(ap, flags); for (cp = cmd; *cp != 0; cp++) { if (*cp == '$') { /* XXX */ c->nparams++; } } c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes)); for (i = 0; i < c->nparams; i++) { Oid type = va_arg(ap, Oid); assert(type == BYTEA || type == INT4OID || type == INT8OID || type == OIDOID || type == TEXTOID || type == TIMESTAMPTZOID); c->paramtypes[i] = type; } va_end(ap); snprintf(c->name, sizeof(c->name), "%u", cmdid++); if ((flags & CMD_NOPREPARE) != 0) { c->prepared_mask = ~0; } else { c->prepared_mask = 0; } c->flags = flags; return c; } static void freecmd(struct cmd *c) { free(c->paramtypes); free(c->cmd); free(c); } static int fetch_noresult(struct Xconn *xc) { PGresult *res; ExecStatusType status; PGconn *conn = xc->conn; int error; pqwait(xc); res = PQgetResult(conn); if (res == NULL) { return ENOENT; } status = PQresultStatus(res); if (status == PGRES_COMMAND_OK) { assert(PQnfields(res) == 0); assert(PQntuples(res) == 0); if (!strcmp(PQcmdTuples(res), "0")) { error = ENOENT; } else { error = 0; } } else if (status == PGRES_FATAL_ERROR) { error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE)); assert(error != 0); dumperror(xc, res); } else { errx(1, "%s not command_ok: %d: %s", __func__, (int)status, PQerrorMessage(conn)); } PQclear(res); res = PQgetResult(conn); assert(res == NULL); if (error != 0) { DPRINTF("error %d\n", error); } return error; } static int preparecmd(struct Xconn *xc, struct cmd *c) { PGconn *conn = xc->conn; const uint32_t mask = 1 << xc->id; int error; int ret; if ((c->prepared_mask & mask) != 0) { return 0; } assert((c->flags & CMD_NOPREPARE) == 0); DPRINTF("PREPARE: '%s'\n", c->cmd); ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes); if (!ret) { errx(EXIT_FAILURE, "PQsendPrepare: %s", PQerrorMessage(conn)); } error = fetch_noresult(xc); if (error != 0) { return error; } c->prepared_mask |= mask; return 0; } /* * vsendcmd: * * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared. * 0 for text and 1 for binary. */ static int vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap) { PGconn *conn = xc->conn; char **paramvalues; int *paramlengths; int *paramformats; unsigned int i; int error; int ret; assert(xc->owner != NULL); assert(xc->blocker == NULL); error = preparecmd(xc, c); if (error != 0) { return error; } paramvalues = emalloc(c->nparams * sizeof(*paramvalues)); paramlengths = NULL; paramformats = NULL; DPRINTF("CMD: '%s'\n", c->cmd); for (i = 0; i < c->nparams; i++) { Oid type = c->paramtypes[i]; char tmpstore[1024]; const char *buf = NULL; intmax_t v = 0; /* XXXgcc */ int sz; bool binary = false; switch (type) { case BYTEA: buf = va_arg(ap, const void *); sz = (int)va_arg(ap, size_t); binary = true; break; case INT8OID: case OIDOID: case INT4OID: switch (type) { case INT8OID: v = (intmax_t)va_arg(ap, int64_t); break; case OIDOID: v = (intmax_t)va_arg(ap, Oid); break; case INT4OID: v = (intmax_t)va_arg(ap, int32_t); break; default: errx(EXIT_FAILURE, "unknown integer oid %u", type); } buf = tmpstore; sz = snprintf(tmpstore, sizeof(tmpstore), "%jd", v); assert(sz != -1); assert((size_t)sz < sizeof(tmpstore)); sz += 1; break; case TEXTOID: case TIMESTAMPTZOID: buf = va_arg(ap, char *); sz = strlen(buf) + 1; break; default: errx(EXIT_FAILURE, "%s: unknown param type %u", __func__, type); } if (binary) { if (paramlengths == NULL) { paramlengths = emalloc(c->nparams * sizeof(*paramformats)); } if (paramformats == NULL) { paramformats = ecalloc(1, c->nparams * sizeof(*paramformats)); } paramformats[i] = 1; paramlengths[i] = sz; } paramvalues[i] = emalloc(sz); memcpy(paramvalues[i], buf, sz); if (binary) { DPRINTF("\t[%u]=\n", i); } else { DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]); } } if ((c->flags & CMD_NOPREPARE) != 0) { ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes, (const char * const *)paramvalues, paramlengths, paramformats, resultmode); } else { ret = PQsendQueryPrepared(conn, c->name, c->nparams, (const char * const *)paramvalues, paramlengths, paramformats, resultmode); } for (i = 0; i < c->nparams; i++) { free(paramvalues[i]); } free(paramvalues); free(paramlengths); free(paramformats); if (!ret) { errx(EXIT_FAILURE, "PQsendQueryPrepared: %s", PQerrorMessage(conn)); } return 0; } int sendcmd(struct Xconn *xc, struct cmd *c, ...) { va_list ap; int error; va_start(ap, c); error = vsendcmd(xc, 0, c, ap); va_end(ap); return error; } int sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...) { va_list ap; int error; va_start(ap, c); error = vsendcmd(xc, resultmode, c, ap); va_end(ap); return error; } /* * simplecmd: a convenient routine to execute a command which returns * no rows synchronously. */ int simplecmd(struct Xconn *xc, struct cmd *c, ...) { va_list ap; int error; va_start(ap, c); error = vsendcmd(xc, 0, c, ap); va_end(ap); if (error != 0) { return error; } return fetch_noresult(xc); } void fetchinit(struct fetchstatus *s, struct Xconn *xc) { s->xc = xc; s->res = NULL; s->cur = 0; s->nrows = 0; s->done = false; } static intmax_t getint(const char *str) { intmax_t i; char *ep; errno = 0; i = strtoimax(str, &ep, 10); assert(errno == 0); assert(str[0] != 0); assert(*ep == 0); return i; } static int vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap) { PGconn *conn = s->xc->conn; unsigned int i; assert(conn != NULL); if (s->res == NULL) { ExecStatusType status; int error; pqwait(s->xc); s->res = PQgetResult(conn); if (s->res == NULL) { s->done = true; return ENOENT; } status = PQresultStatus(s->res); if (status == PGRES_FATAL_ERROR) { error = sqltoerrno( PQresultErrorField(s->res, PG_DIAG_SQLSTATE)); assert(error != 0); dumperror(s->xc, s->res); return error; } if (status != PGRES_TUPLES_OK) { errx(1, "not tuples_ok: %s", PQerrorMessage(conn)); } assert((unsigned int)PQnfields(s->res) == n); s->nrows = PQntuples(s->res); if (s->nrows == 0) { DPRINTF("no rows\n"); return ENOENT; } assert(s->nrows >= 1); s->cur = 0; } for (i = 0; i < n; i++) { size_t size; assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0)); DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n", i, PQftype(s->res, i), types[i], PQgetisnull(s->res, s->cur, i) ? "" : PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) : ""); assert(PQftype(s->res, i) == types[i]); assert(!PQgetisnull(s->res, s->cur, i)); switch(types[i]) { case INT8OID: *va_arg(ap, int64_t *) = getint(PQgetvalue(s->res, s->cur, i)); break; case OIDOID: *va_arg(ap, Oid *) = getint(PQgetvalue(s->res, s->cur, i)); break; case INT4OID: *va_arg(ap, int32_t *) = getint(PQgetvalue(s->res, s->cur, i)); break; case TEXTOID: *va_arg(ap, char **) = estrdup(PQgetvalue(s->res, s->cur, i)); break; case BYTEA: size = PQgetlength(s->res, s->cur, i); memcpy(va_arg(ap, void *), PQgetvalue(s->res, s->cur, i), size); *va_arg(ap, size_t *) = size; break; default: errx(EXIT_FAILURE, "%s unknown type %u", __func__, types[i]); } } s->cur++; if (s->cur == s->nrows) { PQclear(s->res); s->res = NULL; } return 0; } int fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...) { va_list ap; int error; va_start(ap, types); error = vfetchnext(s, n, types, ap); va_end(ap); return error; } void fetchdone(struct fetchstatus *s) { if (s->res != NULL) { PQclear(s->res); s->res = NULL; } if (!s->done) { PGresult *res; unsigned int n; n = 0; while ((res = PQgetResult(s->xc->conn)) != NULL) { PQclear(res); n++; } if (n > 0) { DPRINTF("%u rows dropped\n", n); } } } int simplefetch(struct Xconn *xc, Oid type, ...) { struct fetchstatus s; va_list ap; int error; fetchinit(&s, xc); va_start(ap, type); error = vfetchnext(&s, 1, &type, ap); va_end(ap); assert(error != 0 || s.res == NULL); fetchdone(&s); return error; } /* * setlabel: set the descriptive label for the connection. * * we use simple pointer comparison for label equality check. */ static void setlabel(struct Xconn *xc, const char *label) { int error; /* * put the label into application_name so that it's shown in * pg_stat_activity. we are sure that our labels don't need * PQescapeStringConn. * * example: * SELECT pid,application_name,query FROM pg_stat_activity * WHERE state <> 'idle' */ if (label != NULL && label != xc->label) { struct cmd *c; char cmd_str[1024]; snprintf(cmd_str, sizeof(cmd_str), "SET application_name TO 'pgfs: %s'", label); c = createcmd(cmd_str, CMD_NOPREPARE); error = simplecmd(xc, c); freecmd(c); assert(error == 0); xc->label = label; } else { #if 0 /* don't bother to clear label */ static struct cmd *c; CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'"); error = simplecmd(xc, c); assert(error == 0); #endif } } struct Xconn * begin(struct puffs_usermount *pu, const char *label) { struct Xconn *xc = getxc(puffs_cc_getcc(pu)); static struct cmd *c; int error; setlabel(xc, label); CREATECMD_NOPARAM(c, "BEGIN"); assert(!xc->in_trans); error = simplecmd(xc, c); assert(error == 0); assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); xc->in_trans = true; return xc; } struct Xconn * begin_readonly(struct puffs_usermount *pu, const char *label) { struct Xconn *xc = getxc(puffs_cc_getcc(pu)); static struct cmd *c; int error; setlabel(xc, label); CREATECMD_NOPARAM(c, "BEGIN READ ONLY"); assert(!xc->in_trans); error = simplecmd(xc, c); assert(error == 0); assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); xc->in_trans = true; return xc; } void rollback(struct Xconn *xc) { PGTransactionStatusType status; /* * check the status as we are not sure the status of our transaction * after a failed commit. */ status = PQtransactionStatus(xc->conn); assert(status != PQTRANS_ACTIVE); assert(status != PQTRANS_UNKNOWN); if (status != PQTRANS_IDLE) { static struct cmd *c; int error; assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR); CREATECMD_NOPARAM(c, "ROLLBACK"); error = simplecmd(xc, c); assert(error == 0); } DPRINTF("xc %p rollback %p\n", xc, xc->owner); setlabel(xc, NULL); relxc(xc); } int commit(struct Xconn *xc) { static struct cmd *c; int error; CREATECMD_NOPARAM(c, "COMMIT"); error = simplecmd(xc, c); setlabel(xc, NULL); if (error == 0) { DPRINTF("xc %p commit %p\n", xc, xc->owner); relxc(xc); } return error; } int commit_sync(struct Xconn *xc) { static struct cmd *c; int error; assert(!pgfs_dosync); CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON"); error = simplecmd(xc, c); assert(error == 0); return commit(xc); } static void pgfs_notice_receiver(void *vp, const PGresult *res) { struct Xconn *xc = vp; assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR); fprintf(stderr, "got a notice on %p\n", xc); dumperror(xc, res); } static int pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf, int fd, int *done) { struct Xconn *xc; PGconn *conn; TAILQ_FOREACH(xc, &xclist, list) { if (PQsocket(xc->conn) == fd) { break; } } assert(xc != NULL); conn = xc->conn; PQconsumeInput(conn); if (!PQisBusy(conn)) { if (xc->blocker != NULL) { DPRINTF("schedule %p\n", xc->blocker); puffs_cc_schedule(xc->blocker); } else { DPRINTF("no blockers\n"); } } *done = 0; return 0; } int pgfs_connectdb(struct puffs_usermount *pu, const char *dbname, const char *dbuser, bool debug, bool synchronous, unsigned int nconn) { const char *keywords[3+1]; const char *values[3]; unsigned int i; if (nconn > 32) { /* * limit from sizeof(cmd->prepared_mask) */ return EINVAL; } if (debug) { pgfs_dodprintf = true; } if (synchronous) { pgfs_dosync = true; } i = 0; if (dbname != NULL) { keywords[i] = "dbname"; values[i] = dbname; i++; } if (dbuser != NULL) { keywords[i] = "user"; values[i] = dbuser; i++; } keywords[i] = "application_name"; values[i] = "pgfs"; i++; keywords[i] = NULL; puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL); for (i = 0; i < nconn; i++) { struct Xconn *xc; struct Xconn *xc2; static int xcid; PGconn *conn; struct cmd *c; int error; conn = PQconnectdbParams(keywords, values, 0); if (conn == NULL) { errx(EXIT_FAILURE, "PQconnectdbParams: unknown failure"); } if (PQstatus(conn) != CONNECTION_OK) { /* * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW */ errx(EXIT_FAILURE, "PQconnectdbParams: %s", PQerrorMessage(conn)); } DPRINTF("protocol version %d\n", PQprotocolVersion(conn)); puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ); xc = emalloc(sizeof(*xc)); xc->conn = conn; xc->blocker = NULL; xc->owner = NULL; xc->in_trans = false; xc->id = xcid++; xc->label = NULL; assert(xc->id < 32); PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc); TAILQ_INSERT_HEAD(&xclist, xc, list); xc2 = begin(pu, NULL); assert(xc2 == xc); c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE); error = simplecmd(xc, c); assert(error == 0); freecmd(c); c = createcmd("SET SESSION CHARACTERISTICS AS " "TRANSACTION ISOLATION LEVEL REPEATABLE READ", CMD_NOPREPARE); error = simplecmd(xc, c); assert(error == 0); freecmd(c); c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE); error = simplecmd(xc, c); assert(error == 0); freecmd(c); if (!pgfs_dosync) { c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF", CMD_NOPREPARE); error = simplecmd(xc, c); assert(error == 0); freecmd(c); } if (debug) { struct fetchstatus s; static const Oid types[] = { INT8OID, }; uint64_t pid; c = createcmd("SELECT pg_backend_pid()::int8;", CMD_NOPREPARE); error = sendcmd(xc, c); assert(error == 0); fetchinit(&s, xc); error = FETCHNEXT(&s, types, &pid); fetchdone(&s); assert(error == 0); DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid); } error = commit(xc); assert(error == 0); assert(xc->owner == NULL); } /* * XXX cleanup unlinked files here? what to do when the filesystem * is shared? */ return 0; } struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq); struct puffs_cc *flusher = NULL; int flush_xacts(struct puffs_usermount *pu) { struct puffs_cc *cc = puffs_cc_getcc(pu); struct Xconn *xc; static struct cmd *c; uint64_t dummy; int error; /* * flush all previously issued asynchronous transactions. * * XXX * unfortunately it seems that there is no clean way to tell * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's * too expensive and overkill for our purpose. * besides, PostgreSQL has an optimization to skip XLOG flushing * for transactions which didn't produce WAL records. * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2) * it means that an empty transaction ("BEGIN; COMMIT;"), which * doesn't produce any WAL records, doesn't flush the XLOG even if * synchronous_commit=on. we issues a dummy setval() to avoid the * optimization. * on the other hand, we try to avoid creating unnecessary WAL activity * by serializing flushing and checking XLOG locations. */ assert(!pgfs_dosync); if (flusher != NULL) { /* serialize flushers */ DPRINTF("%p flush in progress %p\n", cc, flusher); waiton(&flushwaitq, cc); assert(flusher == NULL); } DPRINTF("%p start flushing\n", cc); flusher = cc; retry: xc = begin(pu, "flush"); CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE " "pg_current_xlog_insert_location() <> pg_current_xlog_location()"); error = sendcmd(xc, c); if (error != 0) { goto got_error; } error = simplefetch(xc, INT8OID, &dummy); assert(error != 0 || dummy == 1); if (error == ENOENT) { /* * there seems to be nothing to flush. */ DPRINTF("%p no sync\n", cc); error = 0; } if (error != 0) { goto got_error; } error = commit_sync(xc); if (error != 0) { goto got_error; } goto done; got_error: rollback(xc); if (error == EAGAIN) { goto retry; } done: assert(flusher == cc); flusher = NULL; wakeup_one(&flushwaitq); DPRINTF("%p end flushing error=%d\n", cc, error); return error; }